新建表
CREATE TABLE msg_dedup (
id int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
application_name varchar(255) NOT NULL COMMENT '消费的应用名(可以用消费者组名称)',
topic varchar(255) NOT NULL COMMENT '消息来源的topic(不同topic消息不会认为重复)',
tag varchar(16) NOT NULL COMMENT '消息的tag(同一个topic不同的tag,就算去重键一样也不会认为重复),没有tag则存""字符串',
msg_uniq_key varchar(255) NOT NULL COMMENT '消息的唯一键(建议使用业务主键)',
status varchar(16) NOT NULL COMMENT '这条消息的消费状态',
expire_time bigint(20) NOT NULL COMMENT '这个去重记录的过期时间(时间戳)',
PRIMARY KEY (id),
UNIQUE KEY uniq_key (application_name,topic,tag,msg_uniq_key) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
@Slf4j
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
/**
*
* mandatory /?m?nd?t?ri/ 强制性的
* true 找不到队列时 broker会调用basic.return方法将消息返还给生产者
* false 找不到队列时 直接丢弃消息
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("ConfirmCallback 消息已经到达Exchange");
} else {
log.info("ConfirmCallback 消息没有到达Exchange");
}
if (correlationData != null) {
log.info("ConfirmCallback 相关数据:" + correlationData);
}
if (cause != null) {
log.info("ConfirmCallback 原因:" + cause);
}
});
//
/**
* 1.是否通过事物实现 或者对应 channel.txSelect()??
* 2.消息到达队列了,还未持久化,rabbitmq挂掉了.returnCallback会调用吗 ??
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息无法到达队列时触发");
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}
@Configuration
public class ReliableQueueConfig {
@Bean
public Queue reliableQueue() {
return new Queue(Constant.RELIABLE_QUEUE, true);
}
@Bean
DirectExchange reliableExchange() {
return new DirectExchange(Constant.RELIABLE_EXCHANGE, true, false);
}
@Bean
Binding reliable() {
return BindingBuilder.bind(reliableQueue()).to(reliableExchange()).with(Constant.RELIABLE_ROUTING_KEY);
}
}
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/reliable")
public void reliable() {
Map<String, Object> msg = new HashMap<>();
msg.put("msg", "消息的可靠传输就是不多不少");
for (int i = 0; i < 10; i++) {
msg.put("id", i + 1);
rabbitTemplate.convertAndSend(Constant.RELIABLE_EXCHANGE, Constant.RELIABLE_ROUTING_KEY, msg);
}
}
@Slf4j
@Component
public class ReliableReceiver {
@Autowired
MsgDedupService msgDedupService;
String applicationName = "测试消息可靠传输项目";
@RabbitListener(queues = Constant.RELIABLE_QUEUE, ackMode = "MANUAL")
@RabbitHandler
public void reliableReceiver(@NotNull Message message, Channel channel) throws IOException, ClassNotFoundException {
byte[] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));
Map<String, Object> msgMap = (Map<String, Object>) ois.readObject();
// 业务主键
int id = (int) msgMap.get("id");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
MsgDedup msgDedup = insert(id);
// 消费消息...
log.info("reliableReceiver 收到的消息为: {}", msgMap);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 消息消费完成后手动ack
channel.basicAck(deliveryTag, false);
log.info("basicAck ack完毕 deliveryTag:{},时间:{}:", deliveryTag, DateUtil.Date2LongtString(new Date()));
// 修改消息接受状态
updateStatus(msgDedup.getId());
} catch (DuplicateKeyException e) {
log.error("消息已存在数据库");
MsgDedup msgDedup = selectOne(id);
if ("已消费".equals(msgDedup.getStatus())) {
channel.basicAck(deliveryTag, false);
return;
}
// 删除消息记录 删除超过有效期的
Long expireTime = msgDedup.getExpireTime();
if (expireTime - System.currentTimeMillis() > 0) {
msgDedupService.deleteByPrimaryKey(id);
}
// 重新入队
channel.basicReject(deliveryTag, true);
}
}
private MsgDedup selectOne(int id) {
MsgDedup msgDedup = new MsgDedup();
msgDedup.setApplicationName(applicationName);
msgDedup.setTopic(Constant.RELIABLE_ROUTING_KEY);
msgDedup.setTag("");
msgDedup.setMsgUniqKey(id + "");
return msgDedupService.selectOne(msgDedup);
}
private MsgDedup insert(int id) {
MsgDedup msgDedup = new MsgDedup();
msgDedup.setApplicationName(applicationName);
msgDedup.setTopic(Constant.RELIABLE_ROUTING_KEY);
msgDedup.setTag("");
msgDedup.setMsgUniqKey(id + "");
msgDedup.setStatus("未消费");
// 有效期1小时
msgDedup.setExpireTime(System.currentTimeMillis() + 1 * 60 * 60 * 1000);
msgDedupService.insert(msgDedup);
return msgDedup;
}
private void updateStatus(int id) {
MsgDedup msgDedup = new MsgDedup();
msgDedup.setId(id);
msgDedup.setStatus("已消费");
msgDedupService.update(msgDedup);
}
}