# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调大宋这
spring.rabbitmq.publisher-returns=true
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
@Slf4j
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
/**
*
* mandatory /?m?nd?t?ri/ 强制性的
* true 找不到队列时 broker会调用basic.return方法将消息返还给生产者
* false 找不到队列时 直接丢弃消息
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息已经到达Exchange");
} else {
log.info("消息没有到达Exchange");
}
if (correlationData != null) {
log.info("相关数据:" + correlationData);
}
if (cause != null) {
log.info("原因:" + cause);
}
});
/**
* 1.是否通过事物实现 或者对应 channel.txSelect()??
* 2.消息到达队列了,还未持久化,rabbitmq挂掉了.returnCallback会调用吗 ??
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息无法到达队列时触发");
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}
@Slf4j
@Configuration
public class ConfirmConfig {
@Bean
public Queue confirmQueue() {
return new Queue(Constant.CONFIRM_QUEUE, false);
}
@Bean
DirectExchange confirmExchange() {
DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);
// 设置备份交换机 当消息无法到达队列时会进入备份交换机
directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
return directExchange;
}
@Bean
Binding bindingConfirm() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);
}
@Bean
FanoutExchange backupExchange() {
return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);
}
@Bean
public Queue backupQueue() {
return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);
}
@Bean
public Queue warningQueue() {
return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);
}
@Bean
Binding bindingConfirmBackup() {
return BindingBuilder.bind(backupQueue()).to(backupExchange());
}
@Bean
Binding bindingConfirmWarning() {
return BindingBuilder.bind(warningQueue()).to(backupExchange());
}
}
@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";
@GetMapping("/noExchange")
public void noExchange() {
// 找不到交换机
rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}
说明:由于 rabbitTemplate.setMandatory(true),所以当无法到达交换机的时候也会回调
ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
@GetMapping("/toExchange")
public void toExchange() {
// 到达交换机,找不到队列
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}
ConfirmCallback 消息已经到达Exchange
没有收到无法到达队列的消息,why?
因为配置了备份队列,消息可以成功到达备份队列
注掉备份队列再试
@Bean
DirectExchange confirmExchange() {
DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
// 设置备份交换机 当消息无法到达队列时会进入备份交换机
// directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
return directExchange;
}
消息无法到达队列时触发
ReturnCallback: 消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:myConfirmExchange
ReturnCallback: 路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange
ConfirmCallback和ReturnCallback都被调用了
@GetMapping("/toQueue")
public void toQueue() {
// 正常到达队列
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}
ConfirmCallback 消息已经到达Exchange