前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【云原生进阶之PaaS中间件】第四章RabbitMQ-2-AMQP协议

【云原生进阶之PaaS中间件】第四章RabbitMQ-2-AMQP协议

作者头像
江中散人_Jun
发布2024-02-19 08:52:37
1170
发布2024-02-19 08:52:37
举报

1 AMQP背景简介

1.1 概述

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。

通俗来说,在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接收的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

1.2 AMQP与JMS

JMS是早期消息中间件进行标准化的一个尝试,它仅仅是在API级进行了规范,离创建互操作能力还差很远。

与JMS不同,AMQP是一个Wire级的协议,它描述了在网络上传输的数据的格式,以字节为流。因此任何遵守此数据格式的工具,其创建和解释消息,都能与其他兼容工具进行互操作。

1.3 核心组件术语

  • 消息(Message):即客户端与消息中间件传送的数据。
  • 生产者(Producer):消息生产者。
  • 消费者(Consumer):消息消费者。
  • 连接(Connection):一个网络连接,比如TCP/IP连接。AMQP连接通常是长连接,当一个应用不再需要连接到AMQP代理的时候,需要释放掉 AMQP 连接,而不是直接将TCP连接关闭。
  • 信道(Channel):网络信道,是建立在Connection连接之上的一种轻量级的连接,一种基于连接复用的设计,同一个连接中可以创建多个信道。同一应用内一次TCP连接共享成为多个轻量级的线程通道,用通道号标识。详细可参考 JAVA 的NIO实现。
    • 几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
    • 如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。
  • 交换机(Exchange):接收消息,并将消息路由转发给消息队列。
  • 虚拟主机(Virtual Host):进行逻辑隔离,一个虚拟主机可以创建若干个交换机和队列,同一个虚拟主机里面不能有相同名字的Exchange。
    • 为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
  • 绑定(Binding):交换机和队列之间的虚拟连接。
  • 路由键(Routing Key):路由规则,虚拟机可以用来确定如何路由一个特定的消息。
  • 队列(Queue):存储即将被消费者消费掉的消息。
  • 中间件(Broker ):实现AMQP实体服务,比如常见的RabbitMQ、Azure Service Bus等。

1.4 Exchange交换机

AMQP协议提供了四种交换机的实现方式,上面提到的AMQP default就是一种直连交换机,每个队列默认绑定该交换机且路由为其队列名。四种交换机如下:

1.4.1 Direct Exchange直连交换机(RoutingKey 严格topic/空)
1.4.2 Fanout Exchange扇形交换机(广播)
1.4.3 Topic Exchange主题交换机(RoutingKey 模式匹配topic)
1.4.4 Headers Exchange头交换机(消息头信息,x-match: all/any决定是否完全匹配)

Exchange的一些基本属性:

    • Name 交换机名称
    • Durability 重启消息队列服务器,交换机是否持久化
    • Auto-delete 使用后是否自动删除
    • Arguments 其他参数

1.5 Queue队列

队列一般由客户端在消费者消费前创建。一般再次声明队列时如果队列的属性不一致,会有异常抛出。队列需要绑定(订阅)关注的交换机来接收消息。有些设置了过期时间(TTL)的消息经过一定的配置在到达过期时间或被丢弃时会进入一个持久化的队列(死信队列)。死信队列一般被用来实现延时队列的功能。

1.5.1 队列属性

队列的一些基本属性:

  • Name 队列名称
  • Durable 消息代理重启后,队列是否持久化
  • Exclusive 只被一个连接(connection)使用,而且当连接关闭后队列即被删除
  • Auto-delete 当最后一个消费者退订后即被删除
  • Arguments 一些消息代理用他来完成类似与 TTL 的某些额外功能
1.5.2 队列创建

队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。

1.5.3 队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

1.6 Consumer消费者

消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在 AMQP 0-9-1 模型中,有两种途径可以达到此目的:

1)将消息投递给应用 (“push API”)

2)应用根据需要主动获取消息 (“pull API”)

使用 push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

1.7 消息机制

1.7.1 消息确认

消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:

1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))

2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

1.7.2 拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。

当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

1.7.3 预取消息

在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生产应用每分钟才发送一条消息,这说明处理工作尚在运行。)

注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。

1.7.4 消息属性

AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于 AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:

  • Content type(内容类型)
  • Content encoding(内容编码)
  • Routing key(路由键)
  • Delivery mode (persistent or not)
  • 投递模式(持久化 或 非持久化)
  • Message priority(消息优先权)
  • Message publishing timestamp(消息发布的时间戳)
  • Expiration period(消息有效期)
  • Publisher application id(发布应用的 ID)

有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟 HTTP 协议的 X-Headers 很相似。消息属性需要在消息被发布的时候定义。

1.7.5 消息主体

AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

1.7.6 消息持久化

消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。

2 工作原理剖析

2.1AMQP层次模型

三层结构模型,自下而上依次为传输层、会话层和模型层。传输层为TCP / UDP(RabbitMQ使用TCP连接),会话层负责将客户端与消息队列的通信,模型层提供协议的基本架构。

    • Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。

例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。

    • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
    • Transport Layer:位于最底层,主要传输二进制数据流 ,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互。从low-level 举例来说,AMQP 本身是应用层的协议,其填充于 TCP 协议层的数据部分。从 high-level 来说, AMQP是通过协议命令进行交互的。AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于 HTTP 中的方法(GET、 POST 、 PUT、 DELETE 等) 。

2.2 工作过程

AMQP协议的基本工作过程采用发布-订阅模式。发布者经由交换机发布消息,交换机根据对应的路由规则将收到的消息分发给该交换机绑定的队列,队列直接面向客户端,客户端可以选择使用阻塞、轮询等形式从队列获得想要的消息进行处理(发布者不会直接向队列中投递消息,RabbitMQ提供了一个默认交换机AMQP default,当发布者直接向队列中投递消息时,实际是经由该默认交换机进行了分发)。

EXTRA : 1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。 2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。 3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。 4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

2.2.1 AMQP 生产者流转过程

当客户端与 Broker 建立连接的时候,会调用 factory.newConnection 方法,这个方法会进一步封装成 Protocol Header 0-9-1 的报文头发送给 Broker,以此通知 Broker 本次交互采用的是 AMQP0-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK 、 Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这 6 个命令的交互。

当客户端调用 connection.createChannel 方法准备开启信道的时候,其包装Channel.Open 命令发送给 Broker,等待 Channel.Open-Ok 命令。

当客户端发送消息的时候,需要调用 channel.basicPublish 方法,对应的AQMP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了ContentHeader和Content Body。

Content Header里面包含的是消息体的属性,例如,投递模式 、优先级等。而 Content Body 包含消息体本身。

当客户端发送完消息需要关闭资源时,涉及 Channel.Close/.Close-Ok 与Connection.Close/.Close-Ok 的命令交互。

2.2.2 AMQP 消费者流转过程

消费者客户端同样需要与Broker 建立连接。

与生产者客户端一样,协议交互同样涉及Connection.Start/ . Start-Ok 、Connection.Tune/.Tune-Ok 和 Connection.Open/ . Open-Ok 等,这里中省略了这些步骤紧接着也少不了在 Connection 之上建立 Channel,和生产者客户端一样,协议涉及Channel .Open/Open-Ok。

如果在消费之前调用了 channel.basicQos(int prefetchCount) 的方法来设置消费者客户端最大能“保持”的未确认的消息数,那么协议流转会涉及 Basic.Qos/.Qos-Ok 这两个 AMQP 命令。

在真正消费之前,消费者客户端需要向 Broker 发送 Basic.Consume 命令(即调用channel.basicConsume 方法〉将 Channel 置为接收模式,之后 Broker 回执Basic.Consume-Ok以告诉消费者客户端准备好消费消息。紧接着 Broker 向消费者客户端推送 (Push) 消息,即Basic.Deliver 命令,有意思的是这个和 Basic.Publish 命令一样会携带 Content Header 和Content Body 。

消费者接收到消息并正确消费之后,向 Broker 发送确认,即 Basic.Ack 命令。在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及Channel . Close/ . Close-Ok Connection.Close/ . Close-Ok 。

2.2.3 消息的生命周期
  1. 消息由生产者产生。生产者把内容放到消息里,并设置一些属性以及消息的路由。然后生产者把消息发给服务端。
  2. 服务端收到消息,交换器(大部分情况)把消息路由到若干个该服务器上的消息队列中。如果这个消息找不到路由,则会丢弃或者退回给生产者(生产者可自行决定)。
  3. 一条消息可以存在于许多消息队列中。 服务器可以通过复制消息,引用计数等方式来实现。这不会影响互操作性。 但是,将一条消息路由到多个消息队列时,每个消息队列上的消息都是相同的。 没有可以区分各种副本的唯一标识符。
  4. 消息到达消息队列。消息队列会立即尝试通过AMQP将其传递给消费者。 如果做不到,消息队列将消息存储(按生产者的要求存储在内存中或磁盘上),并等待消费者准备就绪。 如果没有消费者,则消息队列可以通过AMQP将消息返回给生产者(同样,如果生产者要求这样做)。
  5. 当消息队列可以将消息传递给消费者时,它将消息从其内部缓冲区中删除。 可以立即删除,也可以在使用者确认其已成功处理消息之后删除(ack)。 由消费者选择“确认”消息的方式和时间。消费者也可以拒绝消息(否定确认)。
  6. 生产者发消息与消费者确认,被分组成一个事务。当一个应用同时扮演多个角色时:发消息,发ack,commit或者回滚事务。消息从服务端投递给消费者这个过程不是事务的。消费者对消息进行确认就够了。

在这个过程中,生产者只能把所有消息发到一个单点(交换器),而不能直接把消息发到某个消息队列(message-queue)中。

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-02-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 AMQP背景简介
    • 1.1 概述
      • 1.2 AMQP与JMS
        • 1.3 核心组件术语
          • 1.4 Exchange交换机
            • 1.4.1 Direct Exchange直连交换机(RoutingKey 严格topic/空)
            • 1.4.2 Fanout Exchange扇形交换机(广播)
            • 1.4.3 Topic Exchange主题交换机(RoutingKey 模式匹配topic)
            • 1.4.4 Headers Exchange头交换机(消息头信息,x-match: all/any决定是否完全匹配)
          • 1.5 Queue队列
            • 1.5.1 队列属性
            • 1.5.2 队列创建
            • 1.5.3 队列持久化
          • 1.6 Consumer消费者
            • 1.7 消息机制
              • 1.7.1 消息确认
              • 1.7.2 拒绝消息
              • 1.7.3 预取消息
              • 1.7.4 消息属性
              • 1.7.5 消息主体
              • 1.7.6 消息持久化
          • 2 工作原理剖析
            • 2.1AMQP层次模型
              • 2.2 工作过程
                • 2.2.1 AMQP 生产者流转过程
                • 2.2.2 AMQP 消费者流转过程
                • 2.2.3 消息的生命周期
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
            http://www.vxiaotou.com