事件和事件队列
DAOS API 函数可以在阻塞或非阻塞模式下使用。 这是通过传递给每个 API 调用的指向 DAOS 事件的指针来确定的:如果 NULL 表示操作将被阻塞。 操作完成后会返回。 所有失败情况的错误码都将通过API函数本身的返回码返回。 如果使用有效的事件,则该操作将以非阻塞模式运行,并在内部调度程序中调度该操作以及将 RPC 提交到底层堆栈后立即返回。 如果调度成功,则操作的返回值为success,但并不表示实际操作成功。 返回时可以捕获的错误要么是无效参数,要么是调度问题。 当事件完成时,操作的实际返回代码将在事件错误代码 (event.ev_error) 中提供。 必须首先通过单独的 API 调用创建要使用的有效事件。 为了允许用户一次跟踪多个事件,可以将事件创建为事件队列的一部分,事件队列基本上是可以一起进行和轮询的事件的集合。 事件队列还在内部为所有 DAOS 任务创建一个单独的任务调度程序以及一个新的网络上下文。 在某些网络提供商上,网络上下文创建是一项昂贵的操作,因此用户应尝试限制在 DAOS 之上的应用程序或 IO 中间件库中创建的事件队列的数量。 或者,可以在没有事件队列的情况下创建事件,并单独跟踪。 在这种情况下,对于阻塞操作,将使用内部全局任务调度程序和网络上下文来代替为事件队列创建的独立任务调度程序和网络上下文。 事件完成后,它可以重新用于另一个 DAOS API 调用,以最大限度地减少 DAOS 库内事件创建和分配的需要
DAOS Task API 提供了一种以非阻塞方式使用 DAOS API 的替代方法,同时在 DAOS API 操作之间构建任务依赖树。 这对于使用 DAOS 并需要构建彼此之间具有依赖关系(N-1、1-N、N-N)的 DAOS 操作计划的应用程序和中间件库非常有用
要利用任务 API,用户需要创建一个调度程序,其中可以创建 DAOS 任务作为其中的一部分。 任务 API 足够通用,允许用户混合 DAOS 特定任务(通过 DAOS 任务 API)和其他用户定义的任务,并在这些任务之间添加依赖关系
有关如何在客户端库中使用 TSE 的更多详细信息,请参阅 TSE 内部文档(https://github.com/ssbandjl/daos/blob/master/src/common/README.md)以获取更多详细信息
以DAOS用户态文件系统dfuse为例
客户端写数据:xb/write.c -> write(fd, direct_write_buf, BUF_SIZE)
write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
DAOS用户态文件系统, 写流程
master -> src/client/dfuse/ops/write.c -> dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t position, struct fuse_file_info *fi)
struct dfuse_projection_info *fs_handle = fuse_req_userdata(req)
eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,每次返回+1前的值, 比如: eqt_idx=7
eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
ev->de_complete_cb = dfuse_cb_write_complete
dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)
daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)
dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和task
sched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器
return dc_task_schedule(task, true)
sem_post(&eqt->de_sem) -> 唤醒EQ
d_slab_restock(eqt->de_write_slab) -> 重用slab
客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动
dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb
dfuse_main.c -> main
daos_debug_init(DAOS_LOG_DEFAULT)
d_log_init_adv 高级日志初始化, 客户端日志文件
log_file = getenv(D_LOG_FILE_ENV) export D_LOG_FILE=/tmp/daos_client.log
debug_prio_err_load_env
d_log_open
freopen(mst.log_file 重新关联标准输出或错误输出
setlinebuf(stderr) 设置错误输出为行缓冲
d_log_sync_mask
dfuse_info->di_eq_count = 1
daos_init -> 初始化客户端库
daos_debug_init
daos_hhash_init_feats
dc_agent_init
dc_job_init
dc_mgmt_net_cfg
daos_eq_lib_init -> 初始化事件队列库 -> static tse_sched_t daos_sched_g -> 指向不属于 EQ 一部分的事件的全局调度程序的指针。 作为 EQ 一部分初始化的事件将在该 EQ 调度程序中进行跟踪
crt_init_opt
crt_context_create(&daos_eq_ctx) -> 全局共享网络上下文, 所有事件队列(eq)共享使用这个上下文
tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) -> 初始化调度器(无事件队列), 为无eq事件设置调度器
dc_mgmt_net_cfg_check
pl_init
dc_mgmt_init
dc_pool_init
dc_cont_init
dc_obj_init
dfuse_fs_init(dfuse_info) -> daos用户态文件信息=文件系统控制器
D_ALLOC_ARRAY(fs_handle->di_eqt, fs_handle->di_eq_count) -> eq数组
d_hash_table_create_inplace dpi_pool_table 打开的池表, 创建hash表 大小=power2(n次方), 操作方法
dpi_iet open inodes
for (i = 0; i < fs_handle->di_eq_count; i++)
struct dfuse_eq *eqt = &fs_handle->di_eqt[i] -> 根据传入的EQ数量, 将eq与文件系统句柄中的eq表绑定
eqt->de_handle = fs_handle -> 互存指针,双向绑定
sem_init(&eqt->de_sem, 0, 0) -> 在 eq 之前创建信号量,因为无法检查 sem_init() 是否已被调用,如果没有调用 sem_destroy 也是无效的。 这样我们就可以避免添加额外的内存来跟踪信号量的状态
daos_eq_create(&eqt->de_eq) -> 一个事件队列关联一个网络上下文, 跟踪池的多个事件 -> 创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络(cart)上下文。 网络上下文创建是一项昂贵的操作,并且在某些系统上网络上下文的数量可能受到限制。 因此,建议不要在用户应用程序或中间件中创建大量事件队列
eq = daos_eq_alloc() -> 分配eq
D_INIT_LIST_HEAD(&eq->eq_running)
D_INIT_LIST_HEAD(&eq->eq_comp)
daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops)
return eq
crt_context_create(&eqx->eqx_ctx)
crt_contpext_provider_create
crt_context_init
daos_eq_insert(eqx)
daos_hhash_link_insert(&eqx->eqx_hlink, DAOS_HTYPE_EQ) -> 插入全局hash表(struct daos_hhash_table daos_ht)
daos_eq_handle(eqx, eqh)
daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联key
tse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx) -> 初始化调度器 -> struct tse_sched_private -> 调度器及队列,属性等
struct tse_sched_private *dsp = tse_sched2priv(sched) -> 设置调度器私有指针dsp
D_INIT_LIST_HEAD(&dsp->dsp_init_list); -> 初始队列
D_INIT_LIST_HEAD(&dsp->dsp_running_list); -> 运行队列
D_INIT_LIST_HEAD(&dsp->dsp_complete_list) -> 完成队列
D_INIT_LIST_HEAD(&dsp->dsp_sleeping_list) -> 睡眠队列
D_INIT_LIST_HEAD(&dsp->dsp_comp_cb_list); -> 完成回调队列
tse_sched_register_comp_cb(sched, comp_cb, udata) -> 初始回调为空
dsc->dsc_comp_cb = comp_cb -> 设置调度器的完成回调和回调参数(udata)
d_list_add(&dsc->dsc_list, &dsp->dsp_comp_cb_list) -> 将调度器的完成回调 -> 调度器的完成回调队列
sched->ds_udata = udata -> 将网络上下文 daos_eq_ctx 设置到调度器的用户数据指针上(也可以是回调数据等)
daos_eq_putref(eqx) -> 减一次引用计数(ch_rec_decref)
duns_resolve_path
dfuse_pool_connect
dfuse_cont_open
dfuse_fs_start 启动文件系统
d_hash_rec_insert(&fs_handle->dpi_iet 将根插入hash表, 在 dfuse_reply_entry 中也会插入: d_hash_rec_find_insert(&fs_handle->dpi_iet
d_slab_init(&fs_handle->di_slab, fs_handle)
for (i = 0; i < fs_handle->di_eq_count; i++)
d_slab_register(&fs_handle->di_slab, &read_slab, eqt, &eqt->de_read_slab)
create_many(type)
ptr = create(type) -> create(struct d_slab_type *type)
type->st_reg.sr_init(ptr, type->st_arg) -> dfuse_event_init -> ev->de_eqt = handle -> 为ev绑定daos_event_t
if (!type->st_reg.sr_reset(ptr)) -> dfuse_read_event_reset(void *arg) -> 重置读事件ev
D_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ) -> 读最大1MB
ev->de_sgl.sg_nr = 1
daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -> 父事件为空, 也支持父事件, 如: daos_event_init(child_events[i], DAOS_HDL_INVAL, &event)
evx->evx_status = DAOS_EVS_READY
if (daos_handle_is_valid(eqh)) -> 句柄有效
eqx = daos_eq_lookup(eqh)
evx->evx_ctx = eqx->eqx_ctx
evx->evx_sched = &eqx->eqx_sched -> 继承EQ的网络和调度器
entry = ptr + type->st_reg.sr_offset
d_list_add_tail(entry, &type->st_free_list) -> 将对象加入空闲列表,计数器+1
type->st_free_count++
d_list_add_tail(&type->st_type_list, &slab->slab_list) -> 将slab放入列表备用
for (i = 0; i < fs_handle->di_eq_count; i++)
dfuse_progress_thread pthread_create(&fs_handle->dpi_thread, NULL, dfuse_progress_thread, fs_handle) 异步进度线程,该线程在启动时使用事件队列启动,并阻塞在信号量上,直到创建异步事件,此时线程唤醒并在 daos_eq_poll() 中忙于轮询直到完成
sem_wait
daos_eq_poll 从 EQ 中检索完成事件
daos_eq_lookup 查找私有事件队列
daos_hhash_link_lookup
crt_progress_cond(epa.eqx->eqx_ctx, timeout, eq_progress_cb, &epa)
eq_progress_cb
dfuse_launch_fuse(fs_handle, &args) 创建fuse文件系统
fuse_session_new(args, &dfuse_ops, sizeof(dfuse_ops), fs_handle)
fuse_session_mount
dfuse_send_to_fg
dfuse_loop
dfuse_fs_fini
DAOS客户端API_事件和事件队列及任务调度引擎: https://github.com/ssbandjl/daos/tree/master/src/client/api
晓兵
博客: https://logread.cn | https://blog.csdn.net/ssbandjl | /developer/user/5060293/articles
公众号: 云原生云
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。