前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq消息的可靠传递

rabbitmq消息的可靠传递

作者头像
九转成圣
发布2024-04-10 18:21:20
530
发布2024-04-10 18:21:20
举报
文章被收录于专栏:csdncsdn

rabbitmq消息的可靠传递

不少

生产者使用发布确认模式

交换机队列消息持久化

消费者手动ack

不多

新建表

代码语言:javascript
复制
CREATE TABLE msg_dedup (
  id int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  application_name varchar(255) NOT NULL COMMENT '消费的应用名(可以用消费者组名称)',
  topic varchar(255) NOT NULL COMMENT '消息来源的topic(不同topic消息不会认为重复)',
  tag varchar(16) NOT NULL COMMENT '消息的tag(同一个topic不同的tag,就算去重键一样也不会认为重复),没有tag则存""字符串',
  msg_uniq_key varchar(255) NOT NULL COMMENT '消息的唯一键(建议使用业务主键)',
  status varchar(16) NOT NULL COMMENT '这条消息的消费状态',
  expire_time bigint(20) NOT NULL COMMENT '这个去重记录的过期时间(时间戳)',
  PRIMARY KEY (id),
  UNIQUE KEY uniq_key (application_name,topic,tag,msg_uniq_key) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

生产者端代码

RabbitTemplateConfig

代码语言:javascript
复制
@Slf4j
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        /**
         *
         * mandatory /?m?nd?t?ri/ 强制性的
         * true 找不到队列时 broker会调用basic.return方法将消息返还给生产者
         * false 找不到队列时 直接丢弃消息
         */
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("ConfirmCallback 消息已经到达Exchange");
            } else {
                log.info("ConfirmCallback 消息没有到达Exchange");
            }
            if (correlationData != null) {
                log.info("ConfirmCallback 相关数据:" + correlationData);
            }
            if (cause != null) {
                log.info("ConfirmCallback 原因:" + cause);
            }
        });

        //
        /**
         * 1.是否通过事物实现 或者对应 channel.txSelect()??
         * 2.消息到达队列了,还未持久化,rabbitmq挂掉了.returnCallback会调用吗 ??
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息无法到达队列时触发");
            log.info("ReturnCallback:     " + "消息:" + message);
            log.info("ReturnCallback:     " + "回应码:" + replyCode);
            log.info("ReturnCallback:     " + "回应信息:" + replyText);
            log.info("ReturnCallback:     " + "交换机:" + exchange);
            log.info("ReturnCallback:     " + "路由键:" + routingKey);
        });

        return rabbitTemplate;
    }
}

队列配置

代码语言:javascript
复制
@Configuration
public class ReliableQueueConfig {
    @Bean
    public Queue reliableQueue() {
        return new Queue(Constant.RELIABLE_QUEUE, true);
    }

    @Bean
    DirectExchange reliableExchange() {
        return new DirectExchange(Constant.RELIABLE_EXCHANGE, true, false);
    }

    @Bean
    Binding reliable() {
        return BindingBuilder.bind(reliableQueue()).to(reliableExchange()).with(Constant.RELIABLE_ROUTING_KEY);
    }
}

发送消息

代码语言:javascript
复制
@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/reliable")
public void reliable() {
    Map<String, Object> msg = new HashMap<>();
    msg.put("msg", "消息的可靠传输就是不多不少");
    for (int i = 0; i < 10; i++) {
        msg.put("id", i + 1);
        rabbitTemplate.convertAndSend(Constant.RELIABLE_EXCHANGE, Constant.RELIABLE_ROUTING_KEY, msg);
    }
}

消费者端代码

代码语言:javascript
复制
@Slf4j
@Component
public class ReliableReceiver {
    @Autowired
    MsgDedupService msgDedupService;
    String applicationName = "测试消息可靠传输项目";

    @RabbitListener(queues = Constant.RELIABLE_QUEUE, ackMode = "MANUAL")
    @RabbitHandler
    public void reliableReceiver(@NotNull Message message, Channel channel) throws IOException, ClassNotFoundException {
        byte[] body = message.getBody();
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
        Map<String, Object> msgMap = (Map<String, Object>) ois.readObject();
        // 业务主键
        int id = (int) msgMap.get("id");
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            MsgDedup msgDedup = insert(id);
            // 消费消息...
            log.info("reliableReceiver 收到的消息为: {}", msgMap);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 消息消费完成后手动ack
            channel.basicAck(deliveryTag, false);
            log.info("basicAck ack完毕 deliveryTag:{},时间:{}:", deliveryTag, DateUtil.Date2LongtString(new Date()));
            // 修改消息接受状态
            updateStatus(msgDedup.getId());
        } catch (DuplicateKeyException e) {
            log.error("消息已存在数据库");
            MsgDedup msgDedup = selectOne(id);
            if ("已消费".equals(msgDedup.getStatus())) {
                channel.basicAck(deliveryTag, false);
                return;
            }
            // 删除消息记录 删除超过有效期的
            Long expireTime = msgDedup.getExpireTime();
            if (expireTime - System.currentTimeMillis() > 0) {
                msgDedupService.deleteByPrimaryKey(id);
            }
            // 重新入队
            channel.basicReject(deliveryTag, true);
        }
    }

    private MsgDedup selectOne(int id) {
        MsgDedup msgDedup = new MsgDedup();
        msgDedup.setApplicationName(applicationName);
        msgDedup.setTopic(Constant.RELIABLE_ROUTING_KEY);
        msgDedup.setTag("");
        msgDedup.setMsgUniqKey(id + "");
        return msgDedupService.selectOne(msgDedup);
    }

    private MsgDedup insert(int id) {
        MsgDedup msgDedup = new MsgDedup();
        msgDedup.setApplicationName(applicationName);
        msgDedup.setTopic(Constant.RELIABLE_ROUTING_KEY);
        msgDedup.setTag("");
        msgDedup.setMsgUniqKey(id + "");
        msgDedup.setStatus("未消费");
        // 有效期1小时
        msgDedup.setExpireTime(System.currentTimeMillis() + 1 * 60 * 60 * 1000);
        msgDedupService.insert(msgDedup);
        return msgDedup;
    }

    private void updateStatus(int id) {
        MsgDedup msgDedup = new MsgDedup();
        msgDedup.setId(id);
        msgDedup.setStatus("已消费");
        msgDedupService.update(msgDedup);
    }
}
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • rabbitmq消息的可靠传递
  • 不少
    • 生产者使用发布确认模式
      • 交换机队列消息持久化
        • 消费者手动ack
        • 不多
        • 生产者端代码
          • RabbitTemplateConfig
            • 队列配置
              • 发送消息
              • 消费者端代码
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
              http://www.vxiaotou.com