前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ中msg&tag的生命周期

RocketMQ中msg&tag的生命周期

作者头像
CBeann
发布2023-12-25 19:37:51
1910
发布2023-12-25 19:37:51
举报
文章被收录于专栏:CBeann的博客CBeann的博客

1 写作目的

?

最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。

2 版本及说明

RocketMQ-4.9.1

3 初识ConsumeQueue及tag

首先要RocketMQ的文件存储设计,本文主要关注CommitLog文件和ConsumeQueue文件,如下图所示(图片引自该处)。当消息生产者生产消息时,所有topic的消息都会顺序的保存在CommitLog文件里,如果只从CommitLog一个文件看,是没有办法快速定位到某个topic的消息,那么此时就需要ConsumeQueue登场了。

ConsumeQueue在不同的文件夹下,根据不同的文件夹可以区分不同的队列,而ConsumeQueue文件存储的是消息的索引信息

如上图所示消息生产者每生产一条消息就对应这下图的一条索引记录。其中消息的真实内容存储在commitLog中。

  • CommitLog Offset:指向commitLog中文件的偏移量。
  • Size:该条消息的大小。
  • Message Tag Hashcode:生产消息时指定的 tag 的hash 值。

4 tag跟踪及定位

整个流程为:

  1. producer生产消息
  2. broker存储消息
  3. conusmer启动流程
  4. broker给consumer消息(过滤tag)
  5. consumer消费消息(过滤&消费)

其中

topic = TopicTest

tag = TagA

4.1 producer生产消息

一般producer生产消息时候会使用如下代码,其中消息要包含topic、tag和msg消息体。

代码语言:javascript
复制
 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
Message msg =
            new Message(
                "TopicTest", // topic
                "TagA", // tag
                "OrderID188", // key
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
SendResult sendResult = producer.send(msg);

其中上面的tag是存在哪呢?跟Message的构造方法可以看到tag其实是放在msg的properties里,MessageConst.PROPERTY_TAGS = TAGS

代码语言:javascript
复制
    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }

跟上面的send方法中间会跟到

MQClientAPIImpl#sendMessage方法,方法中的一行代码如下图所示,创建Request,因为本次发送为单条消息,所以代码中的三元表达式中选择RequestCode.SEND_MESSAGE_V2(310)。

代码语言:javascript
复制
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);

在往下跟其实就是通过Netty给borker发送消息了(非本次内容关注重点,忽略)。

总结:

tag放在msg的properties里

发送请求的code = RequestCode.SEND_MESSAGE_V2(310)

4.2 broker存储消息

本文关注的有两个文件,一个是存储消息的CommitLog文件和存储topic索引的ConsumeQueue文件。

CommitLog是对外暴露的是一个逻辑日志(而真正对应的物理日志是多个MappedFile文件组成的)。该逻辑日志有一个最大偏移量maxOffset(DefaultMessageStore.this.commitLog.getMaxOffset())。当有新消息发到broker时消息会写到CommitLog里并且maxOffset就会增加。

ConsumeQueue的构成是由另一个类ReputMessageService异步线程进行处理,异步构建Consumequeue

ReputMessageService是Runnable实现类,run方法会每隔1秒执行doReput方法,如下面代码所示。

代码语言:javascript
复制
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

ReputMessageService里有一个属性是reputFromOffset,该属性表示同步CommLog到Consumequeue的进度

如果

代码语言:javascript
复制
this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset()

说明有新的消息要从CommLog构建索引到Consumequeue。

而Consumequeue中的三个属性(commitlog offset、size、tag hashcode)是怎么来的?

本身我们是有一个CommitLog的偏移量(reputFromOffset),从这个偏移量开始往后解析我们是可以解析出整条消息的,消息格式如下图所示。

解析出整条消息后可以获取到

  • commitlog offset :从消息中解析到
  • size:解析消息后计算的
  • tag hashcode :从消息中解析到msg的properties并获取到tags(字符串)然后获取hashcode。

那么就可以构建一条Consumequeue索引了。

总结:

broker收到消息后同步放在CommitLog中(本文没讲)

ReputMessageService通过异步不断扫描reputFromOffset和commitLog.getMaxOffset关系从而获取需要构建的通知

解析消息获取Consumequeue参数并构建

4.3 consumer启动流程

1、获取订阅的topic和Queue信息

2、通过Reblace获取被分配的Queue,开始拉取消息

4.3.1 consumer获取topic和Queue信息

消费者启动会调用

MQClientInstance#start()方法,start()方法里有会调用

MQClientInstance#startScheduledTask()方法,里面的一段代码如下,会每隔一段时间更新一下topic路由信息

代码语言:javascript
复制
//MQClientInstance###startScheduledTask()
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

会把路由信息保存到本地的一个HashMap里,这样消费者就拿到了topic的信息并且会把broker的信息保存下来

代码语言:javascript
复制
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//根据主题从nameserver获取topic信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
代码语言:javascript
复制
//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主题和主题队列相关的broker保存下来
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

总结:

消费者拿到主题的队列列表和broker信息

4.3.2 consumer拉取消息

consumer怎么开始拉取消息?这里其实是一个reblance的过程

MQClientInstance的start的方法里会开启一个rebalance的线程,如下面代码所示

代码语言:javascript
复制
//MQClientInstance###start()
public void start() throws MQClientException {
 //省略
 // Start rebalance service
 this.rebalanceService.start();
 //省略
}

跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法。如下面代码所示。根据主题队列列表和消费者组集合去做一个Rebalance,最后的返回结果是当前消费者需要消费的主题队列。

代码语言:javascript
复制
//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //排序
                    //排序
                    //排序
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                       
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResultSet.addAll(allocateResult);
                    }

                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                   
           
        }
    }

上面代码中allocateResultSet就是该consumerGroup被分配的Queue。后面会把每一个Queue包装成一个Task去对应的Broker中拉取消息。

总结:

如下图所示,RebalanceService线程会根据情况把请求放在PullMessageService的pullRequestQueue阻塞队列队列里,队列的每一个节点就是拉请求;PullMessageService线程就是不断去pullRequestQueue里拿任务然后去看一下broker中有没有数据,如果有数据就消费。

4.4 broker响应consumer请求(过滤tag)

首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到

PullMessageProcessor#processRequest方法,方法里有如下的代码

代码语言:javascript
复制
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

跟DefaultMessageStore#getMessage方法

代码语言:javascript
复制
 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //省略        
        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //获取消息的偏移量
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //获取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //获取消息的tag的hashcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }


                            //省略
                            //省略
                            //省略
                            

                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                              

                          //省略
                          //省略
                          //省略
        return getResult;
    }

跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode

代码语言:javascript
复制
//ExpressionMessageFilter#isMatchedByConsumeQueue
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            //省略
            //省略
            //省略

            //订阅主题里是否包含这个hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //省略
    }

总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。

4.5consumer消费消息(过滤tag&消费)

Consumer端在DefaultMQPushConsumerImpl#pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法

代码语言:javascript
复制
PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

         //省略
}

跟一下PullAPIWrapper#processPullResult方法

代码语言:javascript
复制
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
       
              //省略


          
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            
           
           //省略

        return pullResult;
    }

总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag

5 参考

RocketMQ消费者启动流程

/developer/article/2374328

深入剖析 RocketMQ 源码 - 消息存储模块

https://blog.csdn.net/vivo_tech/article/details/121221880

7张图揭晓RocketMQ存储设计的奥妙

https://codingw.blog.csdn.net/article/details/121050549

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-09-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 写作目的
  • 2 版本及说明
  • 3 初识ConsumeQueue及tag
  • 4 tag跟踪及定位
    • 4.1 producer生产消息
      • 4.2 broker存储消息
        • 4.3 consumer启动流程
          • 4.3.1 consumer获取topic和Queue信息
          • 4.3.2 consumer拉取消息
        • 4.4 broker响应consumer请求(过滤tag)
          • 4.5consumer消费消息(过滤tag&消费)
          • 5 参考
          相关产品与服务
          对象存储
          对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
          http://www.vxiaotou.com