前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ延迟消息源码分析

RocketMQ延迟消息源码分析

作者头像
CBeann
发布2023-12-25 19:47:24
1690
发布2023-12-25 19:47:24
举报
文章被收录于专栏:CBeann的博客CBeann的博客

写作目的

?

第一个原因:最近玩哔哩哔哩遇到一个RocketMQ的Contributor,一开始不知道他是Contributor,后来问到延迟消息的时候这块还不是很了解,他告诉我学习要系统,你既然了解事务消息那我理解应该也了解延迟消息,事实我不了解,所以这块想通过看源码的方式了解一下。

第二个原因:好久没写文章了,需要水一篇,也需要不断学习,所以搞一下。

源码分析

延迟消息配置

消息的延时级别level一共有18级,分别为:

代码语言:javascript
复制
 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

延迟消息发送

生产延迟消息的代码如下

代码语言:javascript
复制
  public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    System.out.println("----------------");

    String topic = "DelayDemo";
    Message msg =
            new Message(
                    topic /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ ")/* Message body */ 
                            .getBytes(RemotingHelper.DEFAULT_CHARSET) );
    //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);//下标有0开始

    SendResult sendResult = producer.send(msg, 1000000000);
    System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();
  }

核心的话就是设置延迟消息等级的参数

代码语言:javascript
复制
    //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);//下标有0开始

那么设置的这个参数的意思是什么呢?可以跟进源码看一下。原来是给这个消息设置了一个KV,仅仅是打一个tag,后面会用到。

代码语言:javascript
复制
 public void setDelayTimeLevel(int level) {
 //String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

broker消息接收

目的:我们要定位是到消息生产者Clinet生产的消息被broker接收到走的哪块源码。

首先我们要找到程序的入口代码,如下图所示,就是一个broker的main方法启动。

跟进start方法,然后一步一步往下跟,会跟到NettyRemotingServer的start方法

NettyRemotingServer的start方法里会就是Netty创建ServerBootstrap了,那么很自然的就会想到自定义的handler,也就是NettyServerHandler。

NettyServerHandler#channelRead0方法是处理接收到的程序,进一步跟到NettyRemotingAbstract#processRequestCommand方法,我们可以推断出根据消息的code找到具体的NettyRequestProcessor,就可以知道具体的消息存储逻辑了。

那么发现消息的code是什么呢?

消息生产者发送消息的时候一直跟源码,就会跟到下图的这个地方

就可以拿到

代码语言:javascript
复制
 public static final int SEND_MESSAGE_V2 = 310;

根据这个code就可以定位到处理消息的processor,即SendMessageProcessor

延迟消息存储到CommitLog

从上面的逻辑中我们已经定位到SendMessageProcessor,那么接下来看一下消息存储的粗略逻辑

从SendMessageProcessor#processRequest方法开始跟,如下图所示

最后跟到CommitLog的asyncPutMessage方法,其中里面有一个分支如下图所示

接下来就是正常的存储了

延迟消息构建Consumequeue

Consumequeue的构建在RocketMQ中msg&tag的生命周期4.2小节有讲过。

接下来看一下延迟消息构建过程。

核心在ReputMessageService#doReput方法里的构建DispatchRequest方法

代码语言:javascript
复制
 DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

checkMessageAndReturnSize就是构建DispatchRequest ,当然也会构建tagsCode。如下面的代码所示,如果是延迟消息,则tagsCode=存储时间+延迟时间

延迟消息定时任务

源码剖析RocketMQ延时消息原理第3小节中讲的很详细。

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-12-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写作目的
  • 源码分析
    • 延迟消息配置
      • 延迟消息发送
        • broker消息接收
          • 延迟消息存储到CommitLog
          • 延迟消息构建Consumequeue
          • 延迟消息定时任务
      相关产品与服务
      对象存储
      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
      http://www.vxiaotou.com