前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式基础概念-消息中间件[RabbitMQ]_2

分布式基础概念-消息中间件[RabbitMQ]_2

作者头像
@派大星
发布2023-12-04 20:26:02
1740
发布2023-12-04 20:26:02
举报
文章被收录于专栏:码上遇见你码上遇见你

Rabbit MQ可以直连队列吗

生产者和消费者使用相同的参数声明队列。重复声明不会改变队列

代码语言:javascript
复制
//生产者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送10条消息,依次在消息后面附加1-10个点
for (int i = 6; i > 0; i--){
    String message = "helloworld";
    channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
}
//消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    doWork(message);
}

channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) ; queue:队列名字 durable:队列持久化标志,true为持久化队列 exclusive:exclusive:排他队列,仅对创建的链接可见、链接中的channel都可见,其他链接不能重复声明,链接关闭队列会被自动删除 autoDelete:自动删除,如果该队列没有任何订阅的消>费者的话,该队列会被自动删除。这种队列适用于临时队列。 arguments:Map类型,队列参数设置 x-message-ttl:数字,消息队列中消息的存活时间,超过会被删除x-expires:数字,队列自身的空闲存活时间,指定时间内没有被访问,就会被删除 x-max-length和x-max-length-bytes:队列最大长度和空间,超出会删除老的数据 x-dead-letter-exchange和x-dead-letter-routing-key:设置死信 x-max-priority:队列支持的优先级别,需要生产者在发送消息时指定,消息按照优先级从高到底分发给消费者 channel.basicPublish(exchange, routingKey, mandatory, immediate,basicProperties, body); exchange: 交换机名 routingKey: 路由键 mandatory:为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者, channel.addReturnListener添加一个监听器,当broker执行basic.return方法时,会回调handleReturn方法,这样就可以处理变为死信的消息了;设为false时,出现上述情形broker会直接将消息扔掉; immediate: 3.0以前这个标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如 果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。3.0之后取消了该 参数 basicProperties:消息的详细属性,优先级别、持久化、到期时间等,headers类型的exchange要用到的是其中的headers字段。 body:消息实体,字节数组。 QueueingConsumer:一个已经实现好了的Consumer,相比于自己实现Consumer接口,这是个比较安全快捷的方式。该类基于jdk的BlockingQueue实现,handleDelivery方法中将收到的消息封装成Delivery对象,并存放到BlockingQueue中,这相当于消费者本地存放了一个消息缓存队列。nextDelivery()方法底层调用的BlockingQueue的阻塞方法take()。channel.basicConsume(queue, autoAck, consumer); queue:队列名。 autoAck:自动应答标志,true为自动应答。 consumer:消费者对象,可以自己实现Consumer接口,建议使用QueueingConsumer。

RabbitMQ的持久化机制

  1. 交换机持久化:exchange_declare创建交互机时通过参数指定
  2. 队列持久化:queue_declare创建队列时通过参数指定
  3. 消息持久化:new AMQPMessage创建消息时通过参数指定

append的方式写文件,会根据大小自动生成新的文件,rabbitmq启动时会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(内存不够时)

消息存储时会在ets表中记录消息在文件中的映射以及相关信息(包括id、偏移量,有效数据,左边文件,右边文件),消息读取时根据该信息到文件中读取、同时更新信息

消息删除时只从ets删除,变为垃圾数据,当垃圾数据超出比例(默认50%),并且文件数达到3个,触发垃圾回收,锁定左右两个文件,整理左边文件有效数据、将右边文件有效数据写入左边,更新文件信息,删除右边,完成合并。当一个文件的有用数据等于0时,删除该文件。

写入文件前先写buffer缓冲区,如果buffer已满,则写入文件(此时只是操作系统的页存)每隔25ms刷一次磁盘,不管buffer满没满,都将buffer和页存中的数据落盘每次消息写入后,如果没有后续写入请求,则直接刷盘

RabbitMQ事务消息

通过对信道设置实现

  1. channel.txSelect();通知服务器开启事务模式;服务端会返回Tx.Select-Ok
  2. channel.basicPublish;发送消息,可以是多条,可以是消费消息提交ack
  3. channel.txCommit()提交事务;
  4. channel.txRollback()回滚事务;

消费者使用事务:

  1. autoAck=false,手动提交ack,以事务提交或回滚为准;
  2. autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了

如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息。事务消息会降低rabbitmq的性能.

RabbitMQ如何保证消息的可靠性传输

死信消息:

  1. 消息被消费方否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为 false 。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为死信消息。如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃

为每个需要使用死信的业务队列配置一个死信交换机,同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的routeKey,死信队列只不过是绑定在死信交换机上的队列,死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】

TTL:一条消息或者该队列中的所有消息的最大存活时间

如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。只需要消费者一直消费死信队列里的消息

代码语言:javascript
复制
agruments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");

RabbitMQ的普通集群原理

image.png

元数据:

  • 队列元数据:队列名称和它的属性
  • 交换器元数据:交换器名称、类型和属性
  • 绑定元数据:一张简单的表格展示了如何将消息路由到队列
  • vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性

为什么只同步元数据:

  • 存储空间,每一个节点都保存全量数据,影响消息堆积能力
  • 性能,消息的发布者需要将消息复制到每一个集群节点

客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费

集群节点类型:

  • 磁盘节点:将配置信息和元信息存储在磁盘上。
  • 内存节点:将配置信息和元信息存储在内存中。性能优于磁盘节点。依赖磁盘节点进行持久化

RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个可以,就能正常操作。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-12-04,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 码上遇见你 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rabbit MQ可以直连队列吗
  • RabbitMQ的持久化机制
  • RabbitMQ事务消息
  • RabbitMQ如何保证消息的可靠性传输
  • RabbitMQ的普通集群原理
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com