前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >disruptor笔记之二:Disruptor类分析

disruptor笔记之二:Disruptor类分析

作者头像
程序员欣宸
发布2021-12-07 08:41:03
2650
发布2021-12-07 08:41:03
举报
文章被收录于专栏:实战docker实战docker

《disruptor笔记》系列链接

  1. 快速入门
  2. Disruptor类分析
  3. 环形队列的基础操作(不用Disruptor类)
  4. 事件消费知识点小结
  5. 事件消费实战
  6. 常见场景
  7. 等待策略
  8. 知识点补充(终篇)

本篇概览

  • 通过前文的实战,咱们对Disruptor有了初步认识,借助com.lmax.disruptor.dsl.Disruptor类可以轻松完成以下操作:
  • 环形队列初始化
  • 指定事件消费者
  • 启动消费者线程
  • 接下来要面对两个问题:
  • 深入了解Disruptor类是如何完成上述操作的;
  • 对Disruptor类有了足够了解时,尝试不用Disruptor,自己动手操作环形队列,实现消息的生产和消费,这样做的目的是加深对Disruptor内部的认识,做到知其所以然;
  • 接下来咱们先解决第一个问题吧,结合Disruptor对象的源码来看看上述三个操作到底做了什么;

环形队列初始化

  • 环形队列初始化发生在实例化Disruptor对象的时候,即Disruptor的构造方法:
代码语言:javascript
复制
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
    {
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
    }
  • RingBuffer.createMultiProducer方法内部实例化了RingBuffer,如下图红框:
  • 记下第一个重要知识点:创建RingBuffer对象;

指定事件消费者

  • 在前文中,下面这行代码指定了事件由StringEventHandler消费:
代码语言:javascript
复制
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
  • 查看handleEventsWith方法的内部:
代码语言:javascript
复制
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
	return createEventProcessors(new Sequence[0], handlers);
}
  • 展开createEventProcessors方法,如下图,请重点关注创建SequenceBarrier和BatchEventProcessor等操作:
  • 展开上图红框四中的updateGatingSequencesForNextInChain方法,如下图,红框中的ringBuffer.addGatingSequences需要重点关注:
  • 小结一下,disruptor.handleEventsWith方法涉及到四个重要知识点:
  • 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
  • 创建BatchEventProcessor,负责消费事件
  • 绑定BatchEventProcessor对象的异常处理类
  • 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer

启动消费者线程

  • 前文已通过日志确定了消费事件的逻辑是在一个独立的线程中执行的,启动消费者线程的代码如下:
代码语言:javascript
复制
disruptor.start();
  • 展开start方法,如下可见,关键代码是consumerInfo.start(executor):
代码语言:javascript
复制
    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }
  • ConsumerInfo是接口,对应的实现类有EventProcessorInfo和WorkerPoolInfo两种,这里应该是哪种呢?既然来源是consumerRepository,这就要看当初是怎么存入consumerRepository的,前面在分析createEventProcessors方法时,下图红框中的consumerRepository.add被忽略了,现在需要进去看看:
  • 进去后一目了然,可见ConsumerInfo的实现是EventProcessorInfo:
  • 所以,回到前面对consumerInfo.start(executor)方法的分析,这里要看的就是EventProcessorInfo的start方法了,如下图,非常简单,就是启动一个线程执行eventprocessor(这个eventprocessor是BatchEventProcessor对象):
  • 小结一下,disruptor.start方法涉及到一个重要知识点:
  • 启动独立线程,用来执行消费事件的业务逻辑;

消费事件的逻辑

  • 为了理解消息处理逻辑,还要重点关注BatchEventProcessor.processEvents方法,如下图所示,其实也很简单,就是不停的从环形队列取出可用的事件,然后再更新自己的Sequence,相当于标记已经消费到哪里了:

总结

最后总结Disruptor类的重要功能:

  1. 创建环形队列(RingBuffer对象)
  2. 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
  3. 创建BatchEventProcessor,负责消费事件
  4. 绑定BatchEventProcessor对象的异常处理类
  5. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
  6. 启动独立线程,用来执行消费事件的业务逻辑
  7. 聪明的您一定会发现,本文并没有全面分析Disruptor类的源码,例如after、shutdown等方法都没有提到,确实如此,欣宸在此给您道歉了,本篇的重点是找出那些与基本功能有关代码,为后面的实战提供理论指导(不用Disruptor类实现消息生产消费的实战),因此很多高级功能都跳过了;

理解官方流程图

  • 此时再看官方流程图,聪明的您应该很快就能理解此图表达的意思:每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用事件的出现,等到事件出现了就用get方法取出具体的事件,给EventHandler来处理:

后续预告

  • 此时,咱们对Disruptor类已经有了比较深入的理解,接下来的文章,咱们会尝试不用Disruptor类,仅凭着对RingBuffer对象的操作来实现以下三种功能:
  • 100个事件,单个消费者消费;
  • 100个事件,三个消费者,每个都独自消费这个100个事件;
  • 100个事件,三个消费者共同消费这个100个事件;
本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 《disruptor笔记》系列链接
  • 本篇概览
  • 环形队列初始化
  • 指定事件消费者
  • 启动消费者线程
  • 消费事件的逻辑
  • 总结
  • 理解官方流程图
  • 后续预告
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com