1、削峰:在某个模块接收到超过最大承受的并发量时,可以通过 MQ 排队来使这些削减同一时刻处理的消息量。减小并发量。
2、解耦:在发送 MQ 处理业务时,可以使业务代码与当前的代码解耦,便于维护和拓展。
3、异步:异步使得在调用 MQ 后可以去处理其他操作,在 MQ 执行完后会自动反馈结果。
1、复杂性提高,引入了其他问题。如消息丢失、重复消费、消息顺序执行等。这些解决方案下面会说到。
2、宕机后不可用。可以创建集群来解决。
ActiveMQ:老牌的 MQ,可靠性高,但支持的并发量低,目前维护也比较少。适用于并发量低的项目。
Kafka:支持超高并发场景,但是消息可靠性较差(消费失败后不支持重试)。适用于产生大量数据的数据收集服务。
RocketMQ:支持超高并发场景,可靠性高。但支持的客户端语言不多。适用于高并发的互联网项目。
RabbitMQ:支持一般的高并发场景(万级),可靠性高。支持客户端语言较多。但是其实现是通过 Erlang 语言,不方便学习。适用于中小型项目。
Publisher:生产者,生产消息的组件。
Exchange:交换机,对生产者传来的消息进行解析并传给队列,交换机类型分为 fanout、direct、Topic、headers,其中headers交换机是根据消息对象的 headers 属性值进行匹配的,性能较差,一般不使用。
Queue:队列,因为其是 FIFO 结构,所以消息会按先进先出的顺序被发送给消费者消费。
Binding:交换机与队列的绑定关系,生产者在发送消息时会携带一个 RoutingKey ,在消息到达交换机后,交换机会根据 RoutingKey 匹配对应 BindingKey 的队列,然后把消息发送给该队列。
Virtual Host:又称为 vhost,相当于一个文件夹,包含一个或多个 Exchange 与 Queue 的组合。
Broker:表示消息队列服务器实体。
Consumer:消费者,专门消费消息的组件。
Connection:队列与消费者之间的组件,在由队列向消费者发送消息时,需要先建立连接,创建连接对象。
Channel:通道。消息由队列发送至消费者时,用于传输的通道对象。
四大核心概念指的是生产者、交换机、队列、消费者。
不配置交换机,生产者直接发送给队列(实际使用了默认的交换机),消费者监听队列,队列与消费者是1对1的关系。
和简单模式差不多,同样是不配置交换机,不同的是工作模式多个消费者监听一个队列。
公平分发:在工作模式中,默认情况下多个消费者会依次接收并消费队列中的消息。
不公平分发:在工作模式中,可以在消费者端获取消息时将 channel 的参数 basicQos 设为1(默认0),那么就会在消息分发时优先选择空闲的消费者分发。如果不存在空闲队列,那么还是按公平分发。
预取值:可以看作是规定的消费者等待消费队列内部期望的队列长度。比如消费 C1 是 2,C2 是 3,那么开始的消息会先分配给 C1,直到 C1 中等待消息的消息队列长度为2时,下一个消息才会分配给 C2,然后C2也积累了3个消息后,继续C1、C2轮流分配。预期值默认为0,所以默认情况就是消费者轮流被分配消息。
配置方式也是设置消费者端的 channel 对象的 basicQos 参数。
交换机是 fanout 类型。交换机会将接收的消息发送给所有与其绑定的队列。
交换机是 direct 类型。交换机会根据接收消息的 RoutingKey 寻找匹配的 BindingKey,然后发送给对应的队列。BindingKey 是和 RoutingKey 完全匹配的,一对一关系。
交换机是 topic 类型。交换机会根据接收消息的 RoutingKey 寻找匹配的 BindingKey,与 routing 模式不同的是,topic 模式消息携带的 BindingKey 可以是一个通配符。交换机会匹配与通配符匹配的 BindingKey 对应的队列。* 表示任意一个单次,# 表示0个或多个单次。如果 RoutingKey 不包括通配符,那么就相当于路由模式,如果 RoutingKey 是 #,那么就相当于发布订阅模式。
RPC,也就是远程调用, RabbitMQ 的 RPC 模式可以实现 RPC 的异步调用。客户端既是发送者也是消费者,在请求发送给队列 rpc_queue 后,服务器会监听这个队列,获取后处理,处理完成将返回数据消息发给队列 reply_to,而客户端也会监听这个队列,最终实现得到结果数据。
死信队列,是指消息在变成死信消息后会被发给与其绑定好的死信交换机,然后重新被死信交换机发送至新的队列,最后被消费者消费。而消息在变成死信消息的过程消耗的时间就成为了延期时间,所以常常用于实现延时队列。
1、消息TTL过期。
2、队列内等待消费的消息达到最大长度(默认队列无长度限制)
3、消息在消费者被拒绝(Nack 或 reject),且不重新加入队列
这种方式就是在生产者发送消息时指定消息的过期时间,等到消息在死信队列中过期后会被发送给死信交换机。
配置队列:
发送方:
1、为消息设置过期时间会有一个缺陷,因为队列是先进先出结构,所以如果为消息设置过期时间,那么先进的消息一定会先被执行,后面的一定会先等到前面的消息执行完成后才被执行,如果前面的消息过期时间长于后面的,那么后面的消息即使到达过期时间后也不会被执行,必须等到前面的消息发送完才能执行。所以只适用于发送的延时消息按过期时间递增顺序的场景。
2、直接为队列设置过期时间,因为是进入队列的消息都会被分配相同的过期时间,所以不会产生上面的问题,所以也存在弊端。如果需要配置多个过期时间,那么每次都需要重新声明一个死信交换机、死信队列以及绑定关系。这样会造成配置臃肿。所以只适用于配置过期时间种类数较少的场景。
3、可以看出这两种方式都存在不足之处,有没有一种完美的方案呢?在 1 中,可以将消息按过期时间发送放在交换机里执行。因为交换机并不存在顺序执行,所以就避免了 1 的问题。
实现:
@Configurationpublic class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机 @Bean public CustomExchange delayedExchange() { Map args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
使用 RabbitMQ 来进行部分业务的执行,尤其是一些重要的业务,如果消息在 MQ 中丢失,就会对整个系统造成比较严重的影响。保证消息可靠性主要分为保证各组件的持久化以及避免消息的丢失。
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
消息可靠性
使用 RabbitMQ 来进行部分业务的执行,尤其是一些重要的业务,如果消息在 MQ 中丢失,就会对整个系统造成比较严重的影响。保证消息可靠性主要分为保证各组件的持久化以及避免消息的丢失。
保证交换机、队列、消息的持久化。针对于交换机、队列,在声明时就可以将交换机、队列声明为持久化的。
而消息的持久化,需要在已开启交换机、队列的持久化后,在发送消息时将消息的 BasicProperties 参数的 deliveryMode 设为 2,就可以实现持久化
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
而在 SpringBoot 封装好的 RabbitTemplate 的 convertAndSend 中,默认就已经将 deliveryMode 设为了2。
配置:
1)配置文件打开配置 spring.rabbitmq.publisher-confirm-type=correlated(老版本是spring.rabbitmq.publisher-confirms=true)。
? NONE
禁用发布确认模式,是默认值
? CORRELATED
发布消息成功到交换器后会触发回调方法
? SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
2)配置回调接口并加入到容器
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback {
/** * 交换机不管是否收到消息的一个回调方法 * CorrelationData * 消息相关数据 * ack * 交换机是否收到消息 */
@Override public void confirm(CorrelationData correlationData, Boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
} else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
}
}
}
3)配置生产者(在生产者发送时定义的 CorrelationData 对象可以在回调接口中获取到,如果没有定义回调接口接收的就是空对象)。以及将回调接口注册到 rabbitTemplate 对象中
@RestController@RequestMapping("/confirm")@Slf4jpublic class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private MyCallBack myCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message){ //指定消息 id 为 1 CorrelationData correlationData1=new CorrelationData("1"); String routingKey="key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1); CorrelationData correlationData2=new CorrelationData("2"); routingKey="key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2); log.info("发送消息内容:{}",message); }} 3、交换机到队列(消息未找到匹配队列触发)
如果消息传到交换机后,没有找到对应的队列,那么这个消息默认会丢失,而如果配置了 Mandatory 参数可以在消息在交换机丢失时触发回调方法。
#开启回调函数spring.rabbitmq.publisher-returns=true#是否在交换机没有匹配合适的队列后返回给生产者,false表示丢弃spring.rabbitmq.template.mandatory=true
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
/** * 交换机不管是否收到消息的一个回调方法 * CorrelationData * 消息相关数据 * ack * 交换机是否收到消息 */
@Override public void confirm(CorrelationData correlationData, Boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
} else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
}
}
//当消息无法路由的时候的回调方法 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }
默认情况下,消息发送到消费者后会立刻返回给队列一个确认标识,显示签收。而如果消费者在确认标识返回成功后,执行业务到一半时发生异常,那么这条消息就没有执行完,所以需要关闭自动确认,等到业务执行完毕后才进行手动的确认。在SpringBoot 对 RabbitMQ 封装的依赖中,提供了队列的补偿机制,如果队列在一段时间没有收到消费者的确认消息,那么就会重新发送消息。
手动确认又分为三种方式,单个确认、批量确认和异步确认。
而在 SpringBoot 的继承中,单个确认与批量确认都是使用 channel的 basicAck 方法。
multiple表示签收是否批量,也就是是否包括前面未签收的消息。deleveryTag 是一个自增的消息唯一标识
此外,如果发生异常,可以取消这次确认,并选择是否重新加入队列。拒绝确认有两种方式。一种Nack,一种是 Reject。区别是 Nack 会将当前消息之前的所有未确认的消息也取消确认,而 Reject 只针对于当前消息。(未确认/取消确认的消息会被标记为 unacked 状态,即使宕机也不会丢失,发出的消息如果没有接收到返回信息每隔一段时间会重新发送一次)。
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务。
try {
channel.txSelect();
channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
channel.txCommit();
}
catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
使用事务可以有效生产者端发送消息的可靠性,但是其不适用于多线程执行的场景,多个线程执行效率会很低。所以一般不推荐。
消息补偿机制
在 SpringBoot 为 RabbitMQ 封装的依赖中,提供一种补偿机制,如果发出的消息在一段时间内没有响应(签收或者拒绝),那么该消息就会进行重发。默认情况下会隔5秒一直进行重发,直到消费者响应。
我们可以通过自定义配置参数来修改默认的补偿机制
spring: rabbitmq: listener: simple: retry: enabled: true # 自动触发补偿机制 max-attempts: 5 # 补偿机制尝试次数 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间 multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
对于封装的补偿机制存在一些不足,因为其是无差别补偿,也就是只要消费者没有响应就会重发,但是对于一些异常导致没有响应即使重发几次都会导致没有响应(如数据计算异常,数据类型转换异常),这样的补偿机制就会消耗 CPU 资源。所以对于这些异常可以捕获然后直接处理。对于其他异常(如调用第三方接口失败)则可以进行补偿重试。
对于MQ 整个模块的补偿机制,可以参考下面的架构图
1、发生业务操作,业务数据写入数据库
2、生产者将消息发送给MQ的队列Q1
3、发送了一条与step2中一模一样的延迟消息到了Q3
4、消费者监听Q1,获取到了step2中发送的业务消息
5、消费者在收到生产者的业务消息后,发送了一条确认消息(记录收到的消息信息)到Q2
6、回调检查服务监听了Q2,获取到了消费者发送的确认消息
7、回调检查服务将这条确认消息写入数据库等待之后的比对
8、Q3中的延迟消息延迟时间已到,被回调检查服务接收到,之后就拿着这条延迟消息在数据库中比对,如果比对成功,证明消费者接收到了生产者的业务消息并处理成功(如果不处理成功谁会傻了吧唧发送确认消息呢);如果比对失败,证明消费者没有接收到生产者的业务消息,或者说消费者接收到了业务消息之后始终没有处理成功相关的业务并发送确认消息。这时回调检查服务就会调用生产者的相关业务接口,让生产者再次发送这条失败的消息
9、有一种最极端的情况,step2和step3的消息都发送失败了或者说在消息传递过程中发生意外丢失了!定时检查服务会一直轮询保存确认消息的数据库中的消息数据,并于生产者的业务数据库中的业务数据进行比对,如果两者比对数量一致,则代表业务执行没有问题;如果比对不一致,确认消息数据库的数据量小于生产者业务数据量的话,就证明消费者没有接收到生产者发送的消息。这时定时检查服务会通知生产者再次发送消息到MQ的队列Q1
由于消息补偿机制的存在,可以更加有效保证消息可以被消费,但是带来的问题是可能某个消息执行的比较久,导致同一条消息再次被发送给了消费者,而前一条消息顺利执行完,这样一条消息就会被多次执行,所以消费者端的方法需要涉及成幂等性,也就是对于一条消息,无论被消费者消费几次,效果都是一样的。实现方案主要有两种。
唯一ID指的是使用 UUID、或者操作数据的主键,而指纹码是与业务相关的ID,比如雪花算法就是根据当前时间戳生成的,生成的ID就属于指纹码。
在生产者发送时创建 Messgae 对象,将业务数据以及唯一ID+指纹码保存到Meaasge对象中进行发送
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();amqpTemplate.convertAndSend(queueName, message);
然后在消费者端接收,获取ID,在消费者消费最后以其为主键添加到mysql中,在业务开始时检查是否存在,不存在继续执行。
缺点是高并发场景下会受到性能瓶颈限制。可以通过分库分表解决。
在消费者方法开始使用 redis 的 setnx 方法来处理判断数据可以一步到位,是实现幂等性的最佳方案。
消息顺序执行
如果多个消费者监听同一个队列,那么默认下消息会依次顺序分配给消费者。
上面提到预取值概念,通过配置消费者端的 channel 的 basicQos 参数来修改,但是这会受到消费者执行快慢、生产者发送消息到队列的顺序等因素影响,所以并不可靠。
所以实现消息顺序执行的方式就是增加队列,拆分消费者,使每个消费者只监听一个队列。
如果发现队列中积压了很多消息没有处理,那么该如何解决。
1、对于积压的消息,首先需要先检查对应的消费者端,解决其执行慢导致阻塞的问题后,增加临时队列和消费者来处理积压的消息,等到恢复后再将 MQ 改成原来架构。
2、对于设置了 TTL 的消息,在其因消息积压过期丢失后,在 MQ 空闲时将过期丢失的消息进行重发。
备用交换机
在上面说到过,在消息发给交换机后,如果交换机没有找到匹配的队列,那么这个消息默认会丢失,可以配置消息在交换机上没有匹配到队列后的回调消息,以及将此条消息重新发回生产者。但是也可以配置一个备用交换机,在没有匹配到队列后发给备用交换机。
配置案例:
在同时配置备用交换机、returnCallBack 回调接口下,如果消息没有匹配到对应的消息,那么会优先采用备用交换机。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。