前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq消息的发布确认

rabbitmq消息的发布确认

作者头像
九转成圣
发布2024-04-10 17:00:41
730
发布2024-04-10 17:00:41
举报
文章被收录于专栏:csdncsdn

rabbitmq消息的发布确认

配置文件添加相关配置

代码语言:javascript
复制
# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调大宋这
spring.rabbitmq.publisher-returns=true

发布确认类型

代码语言:javascript
复制
public enum ConfirmType {

   /**
    * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
    * within scoped operations.
    */
   SIMPLE,

   /**
    * Use with {@code CorrelationData} to correlate confirmations with sent
    * messsages.
    */
   CORRELATED,

   /**
    * Publisher confirms are disabled (default).
    */
   NONE

}

配置RabbitTemplate

代码语言:javascript
复制
@Slf4j
@Configuration
public class RabbitTemplateConfig {
    
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        /**
         *
         * mandatory /?m?nd?t?ri/ 强制性的
         * true 找不到队列时 broker会调用basic.return方法将消息返还给生产者
         * false 找不到队列时 直接丢弃消息
         */
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已经到达Exchange");
            } else {
                log.info("消息没有到达Exchange");
            }
            if (correlationData != null) {
                log.info("相关数据:" + correlationData);
            }
            if (cause != null) {
                log.info("原因:" + cause);
            }
        });

        /**
         * 1.是否通过事物实现 或者对应 channel.txSelect()??
         * 2.消息到达队列了,还未持久化,rabbitmq挂掉了.returnCallback会调用吗 ??
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息无法到达队列时触发");
            log.info("ReturnCallback:     " + "消息:" + message);
            log.info("ReturnCallback:     " + "回应码:" + replyCode);
            log.info("ReturnCallback:     " + "回应信息:" + replyText);
            log.info("ReturnCallback:     " + "交换机:" + exchange);
            log.info("ReturnCallback:     " + "路由键:" + routingKey);
        });

        return rabbitTemplate;
    }
}

配置测试交换机和队列

代码语言:javascript
复制
@Slf4j
@Configuration
public class ConfirmConfig {
    @Bean
    public Queue confirmQueue() {
        return new Queue(Constant.CONFIRM_QUEUE, false);
    }

    @Bean
    DirectExchange confirmExchange() {
        DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);
        // 设置备份交换机 当消息无法到达队列时会进入备份交换机
        directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
        return directExchange;
    }

    @Bean
    Binding bindingConfirm() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);
    }

    @Bean
    FanoutExchange backupExchange() {
        return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);
    }

    @Bean
    public Queue backupQueue() {
        return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);
    }

    @Bean
    public Queue warningQueue() {
        return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);
    }

    @Bean
    Binding bindingConfirmBackup() {
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }

    @Bean
    Binding bindingConfirmWarning() {
        return BindingBuilder.bind(warningQueue()).to(backupExchange());
    }
    
}

发送消息

消息无法到达交换机

代码语言:javascript
复制
@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";

@GetMapping("/noExchange")
public void noExchange() {
    // 找不到交换机
    rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}

说明:由于 rabbitTemplate.setMandatory(true),所以当无法到达交换机的时候也会回调

代码语言:javascript
复制
ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)

消息到达交换机无法到达队列

代码语言:javascript
复制
@GetMapping("/toExchange")
public void toExchange() {
    // 到达交换机,找不到队列
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}
代码语言:javascript
复制
ConfirmCallback 消息已经到达Exchange

没有收到无法到达队列的消息,why?

因为配置了备份队列,消息可以成功到达备份队列

注掉备份队列再试

代码语言:javascript
复制
@Bean
DirectExchange confirmExchange() {
    DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
    // 设置备份交换机 当消息无法到达队列时会进入备份交换机
    // directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
    return directExchange;
}
代码语言:javascript
复制
消息无法到达队列时触发
ReturnCallback:     消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:myConfirmExchange
ReturnCallback:     路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange

ConfirmCallback和ReturnCallback都被调用了

成功到达队列

代码语言:javascript
复制
@GetMapping("/toQueue")
public void toQueue() {
    // 正常到达队列
    rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}
代码语言:javascript
复制
ConfirmCallback 消息已经到达Exchange

发布确认流程

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • rabbitmq消息的发布确认
  • 配置文件添加相关配置
  • 发布确认类型
  • 配置RabbitTemplate
  • 配置测试交换机和队列
  • 发送消息
    • 消息无法到达交换机
      • 消息到达交换机无法到达队列
        • 成功到达队列
        • 发布确认流程
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
        http://www.vxiaotou.com