前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析

DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析

原创
作者头像
ssbandjl
修改2023-09-05 08:09:51
3350
修改2023-09-05 08:09:51
举报
文章被收录于专栏:daosdaos

简介

事件和事件队列

DAOS API 函数可以在阻塞或非阻塞模式下使用。 这是通过传递给每个 API 调用的指向 DAOS 事件的指针来确定的:如果 NULL 表示操作将被阻塞。 操作完成后会返回。 所有失败情况的错误码都将通过API函数本身的返回码返回。 如果使用有效的事件,则该操作将以非阻塞模式运行,并在内部调度程序中调度该操作以及将 RPC 提交到底层堆栈后立即返回。 如果调度成功,则操作的返回值为success,但并不表示实际操作成功。 返回时可以捕获的错误要么是无效参数,要么是调度问题。 当事件完成时,操作的实际返回代码将在事件错误代码 (event.ev_error) 中提供。 必须首先通过单独的 API 调用创建要使用的有效事件。 为了允许用户一次跟踪多个事件,可以将事件创建为事件队列的一部分,事件队列基本上是可以一起进行和轮询的事件的集合。 事件队列还在内部为所有 DAOS 任务创建一个单独的任务调度程序以及一个新的网络上下文。 在某些网络提供商上,网络上下文创建是一项昂贵的操作,因此用户应尝试限制在 DAOS 之上的应用程序或 IO 中间件库中创建的事件队列的数量。 或者,可以在没有事件队列的情况下创建事件,并单独跟踪。 在这种情况下,对于阻塞操作,将使用内部全局任务调度程序和网络上下文来代替为事件队列创建的独立任务调度程序和网络上下文。 事件完成后,它可以重新用于另一个 DAOS API 调用,以最大限度地减少 DAOS 库内事件创建和分配的需要

DAOS Task API 提供了一种以非阻塞方式使用 DAOS API 的替代方法,同时在 DAOS API 操作之间构建任务依赖树。 这对于使用 DAOS 并需要构建彼此之间具有依赖关系(N-1、1-N、N-N)的 DAOS 操作计划的应用程序和中间件库非常有用

要利用任务 API,用户需要创建一个调度程序,其中可以创建 DAOS 任务作为其中的一部分。 任务 API 足够通用,允许用户混合 DAOS 特定任务(通过 DAOS 任务 API)和其他用户定义的任务,并在这些任务之间添加依赖关系

有关如何在客户端库中使用 TSE 的更多详细信息,请参阅 TSE 内部文档(https://github.com/ssbandjl/daos/blob/master/src/common/README.md)以获取更多详细信息

事件与事件队列及任务调度引擎流程图

流程说明(dfuse为例)

以DAOS用户态文件系统dfuse为例

  • 在初始化客户端库中初始化事件队列, 关联全局网络上下文, 设置调度器
  • 启动文件系统中注册了SLAB, 绑定事件队列和事件, 参考: daos_event_init
  • 开启轮训线程dfuse_progress_thread, 参考daos_eq_poll
  • 文件系统执行写
代码语言:c
复制
客户端写数据:xb/write.c -> write(fd, direct_write_buf, BUF_SIZE)
write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
  • 封装ev, 并将ev传下去: dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev)
代码语言:c
复制
DAOS用户态文件系统, 写流程
master -> src/client/dfuse/ops/write.c -> dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t position, struct fuse_file_info *fi)
struct dfuse_projection_info *fs_handle = fuse_req_userdata(req)
eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,每次返回+1前的值, 比如: eqt_idx=7
eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
ev->de_complete_cb = dfuse_cb_write_complete
dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)
  daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)
    dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和task
      sched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器
    return dc_task_schedule(task, true)
sem_post(&eqt->de_sem) -> 唤醒EQ
d_slab_restock(eqt->de_write_slab) -> 重用slab

  • 与tse结合构造task, 调度task
  • 网络回复后, 在轮训线程中trigger到, 拿到ev和task, 逐层向上级执行回调函数, 最终执行业务回调

源码分析

代码语言:javascript
复制
客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动
dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb
dfuse_main.c -> main
  daos_debug_init(DAOS_LOG_DEFAULT)
    d_log_init_adv 高级日志初始化, 客户端日志文件
      log_file = getenv(D_LOG_FILE_ENV) export D_LOG_FILE=/tmp/daos_client.log
      debug_prio_err_load_env
      d_log_open
        freopen(mst.log_file 重新关联标准输出或错误输出
        setlinebuf(stderr) 设置错误输出为行缓冲
    d_log_sync_mask
  dfuse_info->di_eq_count = 1
  daos_init -> 初始化客户端库
    daos_debug_init
    daos_hhash_init_feats
    dc_agent_init
    dc_job_init
    dc_mgmt_net_cfg
    daos_eq_lib_init -> 初始化事件队列库 -> static tse_sched_t daos_sched_g -> 指向不属于 EQ 一部分的事件的全局调度程序的指针。 作为 EQ 一部分初始化的事件将在该 EQ 调度程序中进行跟踪
      crt_init_opt
      crt_context_create(&daos_eq_ctx) -> 全局共享网络上下文, 所有事件队列(eq)共享使用这个上下文
      tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) -> 初始化调度器(无事件队列), 为无eq事件设置调度器
    dc_mgmt_net_cfg_check
    pl_init
    dc_mgmt_init
    dc_pool_init
    dc_cont_init
    dc_obj_init
  dfuse_fs_init(dfuse_info) -> daos用户态文件信息=文件系统控制器
    D_ALLOC_ARRAY(fs_handle->di_eqt, fs_handle->di_eq_count) -> eq数组
    d_hash_table_create_inplace dpi_pool_table 打开的池表, 创建hash表 大小=power2(n次方), 操作方法
    dpi_iet open inodes
    for (i = 0; i < fs_handle->di_eq_count; i++)
      struct dfuse_eq *eqt = &fs_handle->di_eqt[i] -> 根据传入的EQ数量, 将eq与文件系统句柄中的eq表绑定
      eqt->de_handle = fs_handle -> 互存指针,双向绑定
    sem_init(&eqt->de_sem, 0, 0) -> 在 eq 之前创建信号量,因为无法检查 sem_init() 是否已被调用,如果没有调用 sem_destroy 也是无效的。 这样我们就可以避免添加额外的内存来跟踪信号量的状态
    daos_eq_create(&eqt->de_eq) -> 一个事件队列关联一个网络上下文, 跟踪池的多个事件 -> 创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络(cart)上下文。 网络上下文创建是一项昂贵的操作,并且在某些系统上网络上下文的数量可能受到限制。 因此,建议不要在用户应用程序或中间件中创建大量事件队列
      eq = daos_eq_alloc() -> 分配eq
        D_INIT_LIST_HEAD(&eq->eq_running)
        D_INIT_LIST_HEAD(&eq->eq_comp)
        daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops)
        return eq
      crt_context_create(&eqx->eqx_ctx)
        crt_contpext_provider_create
          crt_context_init
      daos_eq_insert(eqx)
        daos_hhash_link_insert(&eqx->eqx_hlink, DAOS_HTYPE_EQ) -> 插入全局hash表(struct daos_hhash_table daos_ht)
      daos_eq_handle(eqx, eqh)
        daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联key
      tse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx) -> 初始化调度器 -> struct tse_sched_private -> 调度器及队列,属性等
        struct tse_sched_private  *dsp = tse_sched2priv(sched) -> 设置调度器私有指针dsp
        D_INIT_LIST_HEAD(&dsp->dsp_init_list); -> 初始队列
        D_INIT_LIST_HEAD(&dsp->dsp_running_list); -> 运行队列
        D_INIT_LIST_HEAD(&dsp->dsp_complete_list) -> 完成队列
        D_INIT_LIST_HEAD(&dsp->dsp_sleeping_list) -> 睡眠队列
        D_INIT_LIST_HEAD(&dsp->dsp_comp_cb_list); -> 完成回调队列
        tse_sched_register_comp_cb(sched, comp_cb, udata) -> 初始回调为空
          dsc->dsc_comp_cb = comp_cb -> 设置调度器的完成回调和回调参数(udata)
          d_list_add(&dsc->dsc_list, &dsp->dsp_comp_cb_list) -> 将调度器的完成回调 -> 调度器的完成回调队列
        sched->ds_udata = udata -> 将网络上下文 daos_eq_ctx 设置到调度器的用户数据指针上(也可以是回调数据等)
      daos_eq_putref(eqx) -> 减一次引用计数(ch_rec_decref)
  duns_resolve_path
  dfuse_pool_connect
  dfuse_cont_open
  dfuse_fs_start 启动文件系统
    d_hash_rec_insert(&fs_handle->dpi_iet 将根插入hash表, 在 dfuse_reply_entry 中也会插入: d_hash_rec_find_insert(&fs_handle->dpi_iet
    d_slab_init(&fs_handle->di_slab, fs_handle)
    for (i = 0; i < fs_handle->di_eq_count; i++)
      d_slab_register(&fs_handle->di_slab, &read_slab, eqt, &eqt->de_read_slab)
        create_many(type)
          ptr   = create(type) -> create(struct d_slab_type *type)
            type->st_reg.sr_init(ptr, type->st_arg) -> dfuse_event_init -> ev->de_eqt = handle -> 为ev绑定daos_event_t
            if (!type->st_reg.sr_reset(ptr)) -> dfuse_read_event_reset(void *arg) -> 重置读事件ev
              D_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ) -> 读最大1MB
              ev->de_sgl.sg_nr       = 1
              daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -> 父事件为空, 也支持父事件, 如: daos_event_init(child_events[i], DAOS_HDL_INVAL, &event)
                evx->evx_status = DAOS_EVS_READY
                if (daos_handle_is_valid(eqh)) -> 句柄有效
                  eqx = daos_eq_lookup(eqh)
                  evx->evx_ctx = eqx->eqx_ctx
                  evx->evx_sched = &eqx->eqx_sched -> 继承EQ的网络和调度器
          entry = ptr + type->st_reg.sr_offset
          d_list_add_tail(entry, &type->st_free_list) -> 将对象加入空闲列表,计数器+1
          type->st_free_count++
        d_list_add_tail(&type->st_type_list, &slab->slab_list) -> 将slab放入列表备用
    for (i = 0; i < fs_handle->di_eq_count; i++) 
    dfuse_progress_thread pthread_create(&fs_handle->dpi_thread, NULL, dfuse_progress_thread, fs_handle) 异步进度线程,该线程在启动时使用事件队列启动,并阻塞在信号量上,直到创建异步事件,此时线程唤醒并在 daos_eq_poll() 中忙于轮询直到完成
      sem_wait
      daos_eq_poll  从 EQ 中检索完成事件
        daos_eq_lookup 查找私有事件队列
          daos_hhash_link_lookup
        crt_progress_cond(epa.eqx->eqx_ctx, timeout, eq_progress_cb, &epa)
          eq_progress_cb
        dfuse_launch_fuse(fs_handle, &args) 创建fuse文件系统
          fuse_session_new(args, &dfuse_ops, sizeof(dfuse_ops), fs_handle)
          fuse_session_mount
          dfuse_send_to_fg
          dfuse_loop
  dfuse_fs_fini

总结

  • DAOS的任务调度引擎结合事件队列和事件, 与网络上下文绑定完成抽象封装, 可作为项目第三方组件引入, 结合业务, 完成同步和异步任务调度(依赖任务处理,如多副本写, EC), 事件, 事件队列, 任务, 调度器, HASH表, SLAB, 各种运行队列, 完成队列, 完成回调队列, 延迟队列..., 可应对复杂的业务调度和管理需求
  • 一个文件系统绑定多个事件队列, IO打散到每个事件队列, 负载均衡
  • 全局HASH表结合cookie作为key, 快速捞回事件队列

参考

DAOS客户端API_事件和事件队列及任务调度引擎: https://github.com/ssbandjl/daos/tree/master/src/client/api

DAOS项目简介视频

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | /developer/user/5060293/articles

公众号: 云原生云

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 事件与事件队列及任务调度引擎流程图
  • 流程说明(dfuse为例)
  • 源码分析
  • 总结
  • 参考
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com