前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何从RocketMQ企业版迁移Apache RocketMQ (一)

如何从RocketMQ企业版迁移Apache RocketMQ (一)

原创
作者头像
Yagr Xu
修改2021-12-04 21:29:56
1.1K0
修改2021-12-04 21:29:56
举报

近期很多客户在咨询如何从RocketMQ企业版迁移到标准的Apache RocketMQ。基于此,我做了一下的第一版的Java代码Demo,来尝试总结一些迁移的注意事项和两者在客户端的主要差别。后期再逐步整理其他语言的Demo案例,比如我自己很喜欢的Scala和常见语言 Python/Golang/Nodejs。

第一篇文章会针对最基础的代码做迁移对比,之后会逐步增加高阶功能的迁移。

迁移动机

  1. 黑盒 vs 白盒 对于小部分客户来说,一个Demo也许就足以解决问题了。但是不知道同学们有没有想过,我们当前学语言除了helloworld是在用demo程序以外,大部分场景还需要Java SDK Doc。这里的Doc对于API的说明更多的是一个“合同”,来承诺一个API能做啥,不能做啥。而黑盒的demo,最多只能告诉你在一个很狭窄的场景里这个jar包能做啥,超过一点儿你就会感受到一片漆黑。就像买房子,你不能看过样板房就住进去吧?你要签合同,并且很细致的定义每一个花钱的地方是什么样的质量,承诺了什么功能和保质期。
  2. 绑定 vs 开源 企业版封装的API也许有一些地方是为了方便用户,更好用一些。但是大家想没想过,如果只是API层面的改进,为啥不贡献进社区?妥妥的绑定模式。Kafka/Kubernetes/Istio各种开源社区的项目逛一逛,有哪个是企业版和社区版面目全非的?面目全非一种什么样的开源?
  3. 自主 vs 被动 开源的目的就是,使用者发现问题,讨论出解决方案,立刻贡献给社区。而闭源版本就是你只能等着,接收或者不接受。

开始迁移之旅

说明了动机,也许有些人觉得我说的有些主观,至少这是我真实的看法。我不会喜欢用只有demo的SDK。

使用社区的客户端

我们在项目里选择使用org.apache.rocketmq的客户端。当前比较新的版本是4.8.0,完全没有问题。

代码语言:javascript
复制
    <dependency>
		<groupId>org.apache.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>4.8.0</version>
	</dependency>

基础层面的代码区别

1. Apache RocketMQ的默认消费是Batch模式,源代码如下:

代码语言:javascript
复制
public interface MessageListenerConcurrently extends MessageListener {
    /**
     * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
     * consumption failure
     *
     * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
     * @return The consume status
     */
    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
        final ConsumeConcurrentlyContext context);
}

这个场景下如果想达到针对每个message 消费的回掉接口,需要确认下面这个配置:

代码语言:javascript
复制
consumeMessageBatchMaxSize = 1

之后你会发现,在List<MessageExt> msgs里只会有1个message,这样就确保了回掉Listener每次只处理一个信息,并且返回的Status只针对这个message。

2. Apache RocketMQ的每个consumer只能对应一个MessageListener。所以在使用下面的代码的时候你会发现MessageListener会被覆盖。

代码语言:javascript
复制
for(int i = 0 ; i < x ; i++){
  // 这里topic会被累计叠加subscribe
  consumer.subscribe(topic, tagsString); // tagString in format "tagA||tagB||tagC"
  //Listener会被覆盖掉
  consumer.registerMessageListener(new UnifiedConcurrentlyMessageListener(listenerMap));
}

源代码如下:

代码语言:javascript
复制
    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
            // 针对每个topic 做一个subscriptionData,然后put到一个Map当中去,所以之后可以被轮巡
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

而MessageListener就不一样了

代码语言:javascript
复制
    public void registerMessageListener(MessageListener messageListener) {
        //直接覆盖了~ 好像也可以改名叫setMessageListener
        this.messageListenerInner = messageListener;
    }

所以这里如果需要监听多个topic和使用不同MessageListener的场景里,需要用类似如下的代码实现:

代码语言:javascript
复制
    List<DefaultMQPushConsumer> consumers = new ArrayList<>();
    public void init() {
        try {

            for (String topic : topicList) {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, groupId);
                consumer.setNamesrvAddr(nameSrvAddr);

                // Optional, default value is CONSUME_FROM_LAST_OFFSET
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.setConsumeThreadMin(consumeThreadNums);
                // default max thread number is 20
                consumer.setConsumeThreadMax(consumeThreadNums * 2);

                // max retry times
                consumer.setMaxReconsumeTimes(3);
                consumer.setMessageModel(MessageModel.CLUSTERING);
                consumer.setConsumeMessageBatchMaxSize(1);
                // subscribe to a topic and append a topic message listener.
                consumer.subscribe(topic, tagsStr);
                consumer.registerMessageListener(new MyMessageListener());
              
                // store the consumer to the list
                consumers.add(consumer);
                // start consumer
                consumer.start();
            }

        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException("canot initialize consumer");
        }
    }

3. 有一些命名上的差别:

  • 比如Apache RocketMQ的Consumer在注册Listener时决定是顺序消费还是并行消费,MessageListener分为MessageListenerConcurrentlyMessageListenerOrderly。在企业版里,创建Consumer的时候就定了,Consumer分为ConsumerBatchConsumerOrderConsumer
  • 比如Apache RocketMQ的ConsumeOrderlyStatusConsumeConcurrentlyStatus,分别对应企业版的ActionOrderAction

4. 生产者Async调用

Apache RocketMQ 的生产者函数send(msg, callback) vs 企业版生产者函数sendAsync(msg, callback)

源码如下:

代码语言:javascript
复制
@Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }

    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        final long beginStartTime = System.currentTimeMillis();
        ExecutorService executor = this.getAsyncSenderExecutor();
        try {
            // 这里用异步的线程调用 - 非阻塞
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeout > costTime) {
                        try {
                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                        } catch (Exception e) {
                            sendCallback.onException(e);
                        }
                    } else {
                        sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                    }
                }

            });
        } catch (RejectedExecutionException e) {
            throw new MQClientException("executor rejected ", e);
        }
    }

// 最终调用的是
// MQClientAPIImpl::sendMessageAsync

代码调用如下:

代码语言:javascript
复制
//异步模式
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%s%n", sendResult);
      }
    @Override
    public void onException(Throwable e) {
      	e.printStackTrace();
    }
});
//或者同步模式
SendResult sendResult = producer.send(msg);

运维

当前腾讯云TDMQ的Pulsar已经支持了RocketMQ的协议兼容,并且贡献给了社区 - RoP

当前产品还在内测期,开白可以使用。

想要享受开源便利,又不希望自己运维的同学们可以开始试用了~

下期预告: Apache RocketMQ 在RoP上如何做延迟消息和事物消息。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 迁移动机
  • 开始迁移之旅
    • 使用社区的客户端
    • 基础层面的代码区别
      • 1. Apache RocketMQ的默认消费是Batch模式,源代码如下:
        • 2. Apache RocketMQ的每个consumer只能对应一个MessageListener。所以在使用下面的代码的时候你会发现MessageListener会被覆盖。
          • 3. 有一些命名上的差别:
            • 4. 生产者Async调用
            • 运维
              • 下期预告: Apache RocketMQ 在RoP上如何做延迟消息和事物消息。
              相关产品与服务
              消息队列 TDMQ
              消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
              http://www.vxiaotou.com