本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记。

RabbitMQ 中重要概念

1. 生产者

生产者(producer)创建消息,然后发送到代理服务器(RabbitMQ Server),消息包括两部分:有效载荷(payload)和标签(label)。

有效载荷是要传输的数据,可以是任何内容,比如 JSON串、二进制、自定义的数据协议等;标签描述了有效载荷,并且 RabbitMQ 用它来决定谁将获得消息的投递。

总结:生产者会创建消息并设置标签。

2. 消费者

消费者会订阅到队列(queue)上,每当有消息到达 RabbitMQ 服务器时,会发送给消费者,消费者收到消息时,会进行处理。

消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时, 只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃, 存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。

3. 信道

应用程序和 RabbitMQ 建立 TCP 连接后,应用程序就可以创建一条 AMQP 信道,信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 命令都是通过信道发出去的。每条信道都会被指派成唯一的一个 ID

无论是发布消息、订阅队列或者接收消息,这些动作都是通过信道完成的。为什么要通过信道而不是 TCP 连接发送 AMQP 命令呢? 原因是对于操作系统来说建立和销毁 TCP 会话是非常昂贵的开销,每秒成百上千次地创建信道是不会影响操作系统的,在一条 TCP 连接上创建多少条信道是没有限制的。

4. 代理

代理(Broker): 消息中间件的服务节点。

对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

首先生产者将业务方数据进行可能的包装, 之后封装成消息, 发送(AMQP 协议里这个动作对应的命令为 Basic.Publish) 到 Broker 中。消费者订阅并接收消息(AMQP 协议里这个动作对应的命令为 Basic.Consume 或者 Basic. Get),经过可能的解包处理得到原始的数据,之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。

消费者进程可以使用一个线程去接收消息,存入到内存中,业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解稿,提高整个应用的处理效率。

5. 队列

队列是 RabbitMQ 的内部对象,用于存储消息。

消费者通过以下两种方式从特定的队列中获取消息:

  • 通过 AMQP 的 basic.consume 命令订阅(持续订阅)

消费者在获取到消息并处理后,会自动地从队列中获取下一条信息。

  • 通过 AMQP 的 basic.get 命令订阅(单条订阅)

消费者只会获取单条信息,并取消订阅,如果要循环一直获取消息则需要使用 basic.consume 命令而不是将 basic.get 命令放到循环中处理,因为这样会严重影响 RabbitMQ 的性能。

消息发送到队列中后,如果有消费者订阅了该队列,那么 RabbitMQ 会立即将该消息发送到订阅的消费者,如果没有消费者订阅该消息队列,那么消息将一直在队列中等待,直到有消费者订阅该队列。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(即这次给A 发送,下次给 B 发送)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。

消费者接收到每一条消息都必须进行确认,消费者必须通过 AMQP 的 basic.ack 命令显式地向 RabbitMQ 发送一个确认,或者在订阅队列的时候将 auto_ack 参数设置为 True。当设置了 auto_ack 后,一旦消费者接收到消息,那么 RabbitMQ 会自动视其确认了消息,消费者通过命令告诉 RabbitMQ 它已经正确接收了消息,同时 RabbitMQ 才能安全地把消息从队列中删除。

消费者在接收消息之后,在确认之前,如果从 RabbitMQ 断开连接或者取消队列,那么 RabbitMQ 会认为这条消息没有分发,然后重新分发给下一个订阅的消费者,确保消息被另一个消费者处理。RabbitMQ 后续将不会给当前订阅者发送消息,因为 RabbitMQ 认为当前消费者并没有准备好接收下一条消息。

在接收到消息后,如果发现消息格式有问题,想要明确拒绝,在消息尚未确认之前有以下两个选择:

  • 把消费者从 RabbitMQ 断开连接

这会导致 RabbitMQ 自动重新把消息入队并发送给另一个消费者

  • 使用 AMQP 的 basic.reject 命令

reject 命令的 requeue 参数为 True 时,RabbitMQ 会将消息发送给下一个订阅的消费者;参数为 False 时,RabbitMQ 会立即把消息从队列中删除,而不会把它发送给新的消费者。

队列设置参数:

  • exclusive

该参数为 True 时,队列将变成私有的。该参数可以用于限制一个队列只能有一个消费者使用。

  • auto-delete

最后一个消费者取消订阅的时候,队列就会自动移除。

尝试声明一个已经存在的队列时,只要声明参数完全匹配现存队列的话,RabbitMQ 就什么也不做,并成功返回。

一般情况下,为了避免消息丢失,生产者和消费者都应该尝试去创建队列。

6. 交换器

Exchange: 交换器,它指定消息按什么规则,路由到哪个队列。

生产者将消息发送到 Exchange (交换器,通常也可以用大写的"X" 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

一个 Exchange 可以 binding 多个 Queue,一个 Queue 可以同多个 Exchange 进行 binding。交换器与队列之间的关系是多对多的。

交换器的具体示意图:

7. 路由键

RoutingKey : 路由键。用于把生产者的数据分配到交换器上,Exchange 根据这个关键字进行消息投递。

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。

在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定 RoutingKey 来决定消息流向哪里。

8. 绑定键

BindingKey: 绑定,用于把交换器的消息绑定到队列上,它的作用就是把 ExchangeQueue 按照路由规则绑定起来。

RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键( BindingKey ) ,这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

如下代码的 routing_key=‘world’ 其实就是绑定键。

channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

绑定键示意图

生产者将消息发送给交换器时, 需要一个 RoutingKey , 当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的 BindingKey 。

BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型, 比如 fanout 类型的交换器就会无视 BindingKey ,而是将消息路由到所有绑定到该交换器的队列中。

在direct 交换器类型下,RoutingKey 和BindingKey 需要完全匹配才能使用;但是在topic 交换器类型下, RoutingKey 和 BindingKey 之间需要做模糊匹配,两者并不是相同的。

BindingKey 其实也属于路由键中的一种,官方解释为: the routing key to use for the binding。

可以翻译为:在绑定的时候使用的路由键。可以这么理解:

  • 在使用绑定的时候,其中需要的路由键是 BindingKey,涉及的客户端方法如:
    channel.exchangeBind 、channel .queueBind ,对应的AMQP 命令为Exchange.Bind 、Queue.Bind 。

  • 在发送消息的时候,其中需要的路由键是 RoutingKey,涉及的客户端方法如channel.basicPublish,对应的 AMQP 命令为Basic.Publish。

9. 交换器类型

RabbitMQ 常用的交换器类型有 fanout 、direct、topic 、headers 这四种。

  • fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。类似广播消息。

fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。

  • direct

direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。


以图2-7 为例,交换器的类型为direct,

如果我们发送一条消息,并在发送消息的时候设置路由键为" warning" ,则消息会路由到 Queuel 和Queue2 。

如果在发送消息的时候设置路由键为 “info” 或者 “debug” ,消息只会路由到Queue2 。

如果以其他的路由键发送消息,则消息不会路由到这两个队列中。


请注意以上 Exchange 用的是虚线,为什么用虚线呢?是因为在用 Direct 模式的时候不需要指定对应的交换器,只需要指定对应的 Queue 就可以。那你可能又会问那为什么还把 Exchange 画上去呢,原因是因为当你创建了对应的 vhost 之后 RabbitMQ 就会为我们创建对应的没有名字的一个默认的 Exchange

需要说明,AMQP 提供了 “默认交换器”:类型为 direct,名称为空字符串。任何的队列被创建时,即以队列名称作为绑定键,绑定到 “默认交换器”。

使用代码:

channel.basicPublish("", QueueName, null, message)

推送 direct 交换器消息到对于的队列,空字符为默认的 direct 交换器,用队列名称当做路由键。

  • 持续消息获取使用:basic.consume
  • 单个消息获取使用:basic.get

当接收端订阅者有多个的时候,direct 会轮询公平的分发给每个订阅者(订阅者消息确认正常)。

  • topic

与direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  1. RoutingKey 为一个点号 ". " 分隔的字符串(被点号 “.” 分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client, java.util.concurrent, com.hidden.client”;
  1. BindingKey 和RoutingKey 一样也是点号"."分隔的字符串;
  1. BindingKey 中可以存在两种特殊字符串 * 和 # ,用于做模糊匹配,其中 * 用于匹配一个单词, # 用于 0 个或者多个词)。

以图2-8 中的配置为例:

  • 路由键为 “com.rabbitmq.client” 的消息会同时路由到Queuel 和Queue2;
  • 路由键为 “com.hidden.client” 的消息只会路由到Queue2 中:
  • 路由键为 “com.hidden.demo” 的消息只会路由到Queue2 中:
  • 路由键为 “java.rabbitmq.demo” 的消息只会路由到Queuel 中:
  • 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。

  • headers

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

在绑定队列和交换器时制定一组键值对, 当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers (也是一个键值对的形式) ,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

10. RabbitMQ 总流程

10.1 生产者

(1)生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)

(2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等

(3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

(4)生产者通过路由键将交换器和队列绑定起来

(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息

(6)相应的交换器根据接收到的路由键查找相匹配的队列

(7)如果找到,则将从生产者发送过来的消息存入相应的队列中

(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

(9)关闭信道

(10)关闭连接

10.2 消费者

(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel)

(2)消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作

(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息

(4)消费者确认( ack) 接收到的消息

(5)RabbitMQ 从队列中删除相应己经被确认的消息

(6)关闭信道

(7)关闭连接

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。

当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。

此时就需要开辟多个 Connection ,将这些信道均摊到这些Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

11. 虚拟主机和隔离

每一个 RabbitMQ 服务器都能创建一个消息服务器,我们称之为虚拟主机(vhost), 每一个 vhost 实际上是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定以及自己的权限机制。

多个应用程序可以只运行一个 RabbitMQ 服务器的不同虚拟主机 vhost 上,这样可以避免队列和交换器的命名冲突,各个 vhost 之间逻辑是分离的。

RabbitMQ 包含了开箱即用的默认 vhost:’/’, 在连接时如果没有指定 vhost 就会使用默认的 vhost。

vhost 之间是绝对隔离的。当在 RabbitMQ 里面创建一个用户时,用户通常会被指派给至少一个 vhost,并且只能访问被指派 vhost 内的队列、交换器和绑定。

所以多个用户可以共用一个 RabbitMQ,其中只需要将不同的用户使用不同的 vhost 进行分开,这样大家就只需要搭建一个 MQ 服务器共同使用,从而互补影响。

  • 创建 vhost
rabbitmqctl add_vhost [vhost_name]
  • 删除 vhost
rabbitmqctl delete_vhost [vhost_name]
  • 查询运行哪些 vhost
rabbitmqctl list_vhosts

12. 死信队列

在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给生产者并被丢弃。或者,如果我们为消息设置了有效期,延期后消息会被放入一个所谓的死信队列中。此时,消息生产者可以选择配置死信队列参数来处理这些特殊情况。

一般来讲呢,死信队列都是一些过期的或者不需要处理的消息,我们这边其实就是故意利用了消息过期之后进入死信队列这个特性来处理延迟任务,为消息设置需要延迟的时间的等长有效期,等消息过期之后从死信队列里面拿出消息处理。

一般来说以下情况会导致消息进入死信队列:
1.消息被拒绝 basic.reject/basic.nack 并且设置 requeuefalse(不重回队列)的时候,消息就会进入死信队列。
2. 消息队列 TTL 过期或者消息有效期过期。
3. 队列达到最大的长度,并且我们没有设置自动拒绝消息的时候,队首的消息就会进入死信队列。

RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器相关推荐

  1. RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息

    消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...

  2. 生产者/消费者模式(阻塞队列)

    生产消费者模式  貌似也是阻塞的问题  花了一些时间终于弄明白这个鸟东东,以前还以为是不复杂的一个东西的,以前一直以为和观察者模式差不多(其实也是差不多的,呵呵),生产消费者模式应该是可以通过观察者模 ...

  3. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  4. RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;

    说明: (1)内容说明: ● 这儿我们会创建一个项目,演示RabbitMQ最基础的内容: 通过,这个最简单的例子,先了解:如何使用RabbitMQ,如何连接RabbitMQ,如何发送消息,如何接收消息 ...

  5. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  6. RabbitMQ 入门系列(9)— Python 的 pika 库常用函数及参数说明

    1. pika.PlainCredentials(username, password, erase_on_connect) 功能:创建连接时的登录凭证 参数: username: MQ 账号 pas ...

  7. RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)

    1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...

  8. RabbitMQ 入门系列(1)— Ubuntu 安装 RabbitMQ 及配置

    1. RabbitMQ 简介 消息 (Message) 是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串.JSON等,也可以很复杂,比如内嵌对象. 消息队列中间件(Message Que ...

  9. java实现rabbitmq路由模型(routing/topic queues), 生产者 消费者 交换机 消息队列

    在fanout模型中,一条消息会被所有订阅的队列消费,即绑定了对应交换机的消费者,都能收到消息.但在某些场景下,我们希望不同的消息发送到不同的队列,被不同的消费者消费,此时就要用到Direct类型的交 ...

最新文章

  1. js中为什么你不敢用 “==”
  2. GNS3错误7200:无法开始Dynamips于端口7200
  3. 两数相乘结果溢出的判断
  4. 转载:售前十年,你在第几年
  5. LNMP服务跨省迁移的解决方案
  6. PHP - PDO 之 mysql 参数绑定
  7. linux多线程编程和linux 2.6下的nptl,Linux多線程編程和Linux 2.6下的NPTL
  8. 我们真的需要统一的编程规范?
  9. Spark的三种运行模式
  10. react native webview 百度地图_react-native-baidu-map使用及注意问题
  11. 计算机地质制图CAD,CAD地质制图线型
  12. 【FPGA基础】四位二进制--格雷码转换器(vivado)
  13. O2O营销模式(Online To Offline)
  14. 免费试用腾讯云服务器 + nginx建网站
  15. CTF supersqli
  16. 【R语言数据科学】(十三):有趣的概率学(下)
  17. 2022-2028年中国淄博房地产行业市场发展潜力及投资策略研究报告
  18. vue模块给模块传参_Vue店面的Paypal支付模块
  19. xtend怎么使用_Java替代Xtend又推出了另一个块摇摆更新
  20. saas php7框架开源,HRM SAAS v2.5.7 – PHP人力资源管理系统SaaS平台版

热门文章

  1. 2022-2028年中国橡胶带行业市场运营格局及未来前景分析报告
  2. 利用c语言找出输入文本最长的一行
  3. 正视自己的所想所求,活出真实的自己
  4. 增强型固态硬盘支持人工智能工作负载
  5. 未来几年自动驾驶预测(下)
  6. h264和h265多维度区别
  7. 2021年大数据常用语言Scala(六):基础语法学习 数据类型与操作符
  8. Android shape 画的圆角带四个黑变 问题
  9. MSDN 教程短片 WPF 16(Path路径)
  10. jquery 实现Json节点的增删改查