要了解 RabbitMQ 的交换机发布订阅模型,先来了解下 RabbitMQ 消息传递模型的核心思想:生产者从不直接向队列发送任何消息。实际上,通常情况下,生产者甚至根本不知道消息是否会被传递到任何队列。相反,生产者只能向交换机发送消息。交换机一边接收来自生产者的消息,另一边将消息推送到队列。交换机是否将消息推送到队列,是否推送到多个队列,或抛弃消息,这些规则由 exchange 类型定义。

首先定义一个 RabbitMQ 工具类:

public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置账号,用户名、密码、vhost,使用默认设置// factory.setVirtualHost("/dev");//factory.setUsername("guest");// factory.setPassword("guest");//通过工厂获取连接Connection connection = factory.newConnection();return connection;}

Fanout 交换机发布订阅模型

RabbitMQ 中的交换机模型之一,生产者发送消息的目标对象不再是消息队列,而是和消息队列绑定的交换机,可以说是生产者和消息队列之间的交流中间站,生产者不必关心消息需要传递到哪一个队列,唯一关心的是,生产者只向交换机发送消息。

工作原理图:

官方模型:

生产者发送消息

public class Producer {private static final String Exchange_Name = "rabbit:exchange:01";public static void main(String[] args) {try {// 连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//TODO: fanout-exchange 无意识分发消息模型channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);//指定发送的消息体String message = "有新的订单消息!";// 消息发送到 Exchange 交互机channel.basicPublish(Exchange_Name, "", null, message.getBytes("UTF-8"));System.out.println("生产者发送消息成功--->");channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}

消费者1:

public class MultiConsumerOne {private static final String Exchange_Name = "rabbit:exchange:01";private static final String Queue_Name_01 = "rabbit:queue:01";public static void main(String[] args) {try{//连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//TODO: fanout-exchange无意识分发消息模型-绑定// 声明绑定 Exchange 交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);// 声明队列绑定channel.queueDeclare(Queue_Name_01, true, false, false, null);// 队列绑定交换机channel.queueBind(Queue_Name_01, Exchange_Name, "");// 消费者接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1成功接收到消息--->" + message);}};channel.basicConsume(Queue_Name_01, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

消费者2:

public class MultiConsumerTwo {private static final String Exchange_Name = "rabbit:exchange:01";private static final String Queue_Name_02 = "rabbit:queue:02";public static void main(String[] args) {try{// 连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//TODO: fanout-exchange 无意识分发消息模型-绑定// 声明绑定 Exchange 交换机类型channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);// 队列绑定交换机channel.queueDeclare(Queue_Name_02, true, false, false, null);// 队列绑定交换机channel.queueBind(Queue_Name_02, Exchange_Name, "");// 消费者接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2成功接收到消息--->" + message);}};channel.basicConsume(Queue_Name_02, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

消费者1运行结果:

消费者2运行结果:

所有绑定了跟 Exchange 绑定的队列的消费者都会接收到来自生产者发送的消息,这就相当于一个广播式的消息模型。

Direct 交换机发布订阅模型

Direct交换机 + 路由模式:生产者将消息推送到交换机,交换机根据路由来转发消息到指定的路由队列。因此,当消费者绑定到指定路由队列的时候,则能接收到生产者发送的消息。

DirectExchange + Routing 模型和 FanoutExchange 模型对比:

Fanout 交换机模式,并没有提供过多的灵活性,只能进行无脑的广播,只要消费者绑定了由 Fanout 交换机绑定的队列,都能接收到生产者发送的消息。而 Direct 交换机 + 路由模型,消费者绑定的队列不仅需要指定交换机,还需要队列绑定相应的路由才能进行接收生产者的消息,在一定程度上提高了接收消息的灵活性。

工作原理图:

生产者:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** direct-exchange+routingKey: 队列与交换机的绑定-ps有针对性的订阅监听消费* @Author: liwei* @Date: 2019/10/2 9:28*/
public class RoutingProducer {private static final String Exchange_Name = "rabbit:exchange:routing:e01";private static final String Queue_Name_01="rabbit:queue:routing:q01";private static final String Queue_Name_02="rabbit:queue:routing:q02";private static final String Routing_Key_01="rabbit:routing:key:r01";private static final String Routing_Key_02="rabbit:routing:key:r02";private static final String Routing_Key_03="rabbit:routing:key:r03";public static void main(String[] args) {try {//连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//TODO:direct-exchange+routingKey分发消息模型//声明交换机模型channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);//声明队列、交换机、路由,绑定交换机路由channel.queueDeclare(Queue_Name_01, true, false, false, null);channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);channel.queueDeclare(Queue_Name_02, true, false, false, null);channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_03);//消息体String message01 = "directExchange-publish我的消息-r01";String message02 = "directExchange-publish我的消息-r02";String message03 = "directExchange-publish我的消息-r03";//分发消息channel.basicPublish(Exchange_Name, Routing_Key_01, null, message01.getBytes("UTF-8"));channel.basicPublish(Exchange_Name, Routing_Key_02, null, message02.getBytes("UTF-8"));channel.basicPublish(Exchange_Name, Routing_Key_03, null, message03.getBytes("UTF-8"));System.out.println("生产者发送消息成功--->");channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}

消费者1:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class MultiRoutingConsumerOne {private static final String Exchange_Name = "rabbit:exchange:routing:e01";private static final String Queue_Name_01 = "rabbit:queue:routing:q01";private static final String Routing_Key_01 = "rabbit:routing:key:r01";public static void main(String[] args) {try{//连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//TODO:direct-exchange+routingKey分发消息模型//指定交换机模型channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);//声明队列channel.queueDeclare(Queue_Name_01, true, false, false, null);//绑定队列,交换机、路由channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);//接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1成功接收到消息--->" + message);}};//自动确认消费channel.basicConsume(Queue_Name_01, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

消费者2:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class MultiRoutingConsumerTwo {private static final String Exchange_Name = "rabbit:exchange:routing:01";private static final String Queue_Name_02 = "rabbit:queue:routing:q02";private static final String Routing_Key_02 = "rabbit:routing:key:r02";private static final String Routing_Key_03 = "rabbit:routing:key:r03";public static void main(String[] args) {try{//连接 RabbitMQConnection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//指定交换机模型channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);//声明队列channel.queueDeclare(Queue_Name_02, true, false, false, null);//绑定队列、交换机、路由。可绑定多个channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_03);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2成功接收到消息--->" + message);}};//自动确认消费channel.basicConsume(Queue_Name_02, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

(先启动消费者在启动生产者)

消费者1运行结果:

消费者2运行结果:

生产者分发的消息不仅仅是指定了exchange,还指定了交换机需要根据路由进行传递消息,channel.basicPublish 的第 2 的参数就是 exchange 需要指定的路由。

来看下消费者1绑定的队列核心代码:

//绑定队列,交换机、路由
channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);

消费者2绑定的队列核心代码:

//绑定队列、交换机、路由。可绑定多个
channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);
channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_03);

由代码的传参可知,消费者1,2绑定的队列都绑定了交换机,但由于两个消费者绑定的队列负责监听的交换机路由不同,所以接收的消息导致不同,不再是 Exchange-Fanout 模型那样,只要队列绑定了交换机,就无脑地进行接收消息。消费者1只指定了路由 Routing_Key_01,则只接收 Routing_Key_01 路由路线的消息,消费者同时指定Routing_Key_02,Routing_Key_03 则消费者同时监听接收 Routing_Key_02,Routing_Key_03 的消息。

Topic 交换机发布订阅模型

Topic 交换机 + 路由匹配模式:和 DirectExchange + Routing 模式相似,只是路由可以通过模糊匹配来获取生产者发送给消费者的消息。这样,消费者则可以对某一类或标准的路由队列进行绑定,从而实现订阅消息的功能。

Topic 交换机 + 路由匹配模式和 Direct 交换机 + 路由模式对比:前者改善了后者不能对标准路由进行统一绑定以获取消息的局限性,更大程度地提高了接收消息的灵活性。

注意:

  1. 发送到 Topic 交换机的路由器也不是随便任意的声明绑定的,它必须是由点分割的单词,如 "stock.usd","quick.orange.rabbit",最大长度 255 字节。
  2. 匹配符号 * 和 # : * 匹配符只能匹配一个单词, # 能匹配多个单词

工作原理图:

官方模型:

生产者:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class RoutingTopicProducer {private static final String Exchange_Name = "rabbit:exchange:topic:routing:01";public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String message = "有新的订单消息!";channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange", null, message.getBytes("UTF-8"));channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange.apple", null, message.getBytes("UTF-8"));System.out.println("生产者发送消息成功--->");channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消费者1:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class MultiRoutingTopicConsumerOne {private static final String Exchange_Name = "rabbit:exchange:routing:01";private static final String Queue_Name_01 = "rabbit:queue:routing:01";//只能匹配 r. 后的一个单词 private static final String Routing_Key_01 = "rabbit:routing:key:r.*";public static void main(String[] args) {try{Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);channel.queueDeclare(Queue_Name_01, true, false, false, null);channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1成功接收到消息--->" + message);}};channel.basicConsume(Queue_Name_01, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

消费者2:

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class MultiRoutingTopicConsumerTwo {private static final String Exchange_Name = "rabbit:exchange:topic:routing:01";private static final String Queue_Name_02 = "rabbit:queue:topic:routing:02";// 能匹配r. 后面多个单词 private static final String Routing_Key_02 = "rabbit:routing:topic:key:r.#";public static void main(String[] args) {try{Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);channel.queueDeclare(Queue_Name_02, true, false, false, null);channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2成功接收到消息--->" + message);}};channel.basicConsume(Queue_Name_02, true, consumer);}catch (Exception e) {e.printStackTrace();}}}

(先启动消费者在启动生产者)

消费者1运行结果:

消费者2运行结果:

生产者发送消息的核心代码,其实和 Direct-Exchange 模型绑定交换机路由并没有什么太多不一样,只是路由具有些相同的特征,前面能找到几个相同的单词,后面由几个不用的单词组成一个路由

核心代码:

channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange", null, message.getBytes("UTF-8"));
channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange.apple", null, message.getBytes("UTF-8"));

绑定的路由只是后面部分的单词不一样而已。

消费者1绑定路由核心代码:

// 声明一个路由,并由一个带 * 的符号表示模糊匹配
//只能匹配 r. 后的一个单词
private static final String Routing_Key_01 = "rabbit:routing:key:r.*";//绑定队列,交换机、路由
channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);

消费者2绑定路由核心代码:

// 声明一个路由,并由一个带 # 的符号表示模糊匹配
//只能匹配 r. 后的一个单词
private static final String Routing_Key_02 = "rabbit:routing:key:r.#";//绑定队列,交换机、路由
channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);

由于 RabbitMQ 路由匹配模型规则: * 匹配符只能匹配一个单词, # 能匹配多个单词所以消费者1只能接收到

channel.basicPublish(Exchange_Name,"rabbit:routing:topic:key:r.orange",null, message.getBytes("UTF-8"));

生产者发送的路由 rabbit:routing:topic:key:r.orange 的消息

而消息者2同时能接收到

channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange", null, message.getBytes("UTF-8"));
channel.basicPublish(Exchange_Name, "rabbit:routing:topic:key:r.orange.apple", null, message.getBytes("UTF-8"));

生产者发送的路由 rabbit:routing:topic:key:r.orange 和路由 rabbit:routing:topic:key:r.orange.apple 这两个路由的消息。

还不知道 RabbitMQ 常用的几种交换机模式?这篇小白都能看懂的 RabbitMQ 交换机模式相关推荐

  1. 所有人都能看懂的华为交换机vlan配置

    [vlan小故事]为什么会有vlan?   早期以太网是一种基于CSMA/CD1(Carrier Sense Multiple Access/Collision Detection)的共享通讯介质的数 ...

  2. 零基础小白都能看懂的Java处理高并发的3种解决方案

    高并发是互联网应用的一大特色,也是互联网应用不可避免的问题;比如淘宝双11.京东618.12306春节火车票,促销.秒杀等等. 一.什么是高并发 高并发(High Concurrency)是互联网分布 ...

  3. yolov5使用2080ti显卡训练是一种什么样的体验我通过vscode搭建linux服务器对python-yolov5-4.0项目进行训练,零基础小白都能看得懂的教程。>>>>>>>>>第二章番外篇

    第二章番外篇:yolov5通过vscode搭建linux服务器对python-yolov5-4.0项目进行训练,零基础小白都能看得懂的教程.YOLOv5搭建的最快搭建方式,踩坑经历详谈 前期准备: 2 ...

  4. RabbitMQ入门篇、介绍RabbitMQ常用的五种模式

    RabbitMQ 认识RabbitMQ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为 ...

  5. rabbitmq常用的五种模型

    5种常用模型 一.基本消息模型 二丶work消息模型 三丶fanout广播模式/发布/订阅模式 四丶Routing路由模式(direct) 五丶Topics(主题模型) 第一种:简单模式 Simple ...

  6. 23 种设计模式的#羞羞#解释,听说能看懂的人都是泡妞高手

    作者:HChan 博客:zhuanlan.zhihu.com/p/100746724 01 工厂方法追 MM 少不了请吃饭了,麦当劳的鸡翅和肯德基的鸡翅都是 MM 爱吃的东西,虽然口味有所不同,但不管 ...

  7. RabbitMQ最全使用教程-小白也能看懂

    文章标题 前言 一: 消息中间件是什么?使用它有什么作用? 二: 常见的消息中间件有哪些? 三: 为什么选择RabbitMQ? 四: RabbitMQ中核心的内容有那些? 五: Spring如何整合r ...

  8. 【外行也能看懂的RabbitMQ系列(一)】—— RabbitMQ快速入门篇(内含丰富实例)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

  9. 【外行也能看懂的RabbitMQ系列(二)】—— RabbitMQ的Web管理界面(rabbitmq_management)详解(内含Topic模式通配符实操)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

最新文章

  1. Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享
  2. 计算机类|期刊】SCI期刊专刊截稿信息4条
  3. POJ 1364 King (差分约束系统)
  4. 技术面试的时候应该了解公司点什么
  5. qt调用mysql加密接口_Qt实现客户端/服务器端登录验证|数据传输使用md5加密 | 学步园...
  6. 【Python】Matplotlib 可视化必备神书,附pdf下载
  7. Spark笔记:复杂RDD的API的理解(上)
  8. oracle time model,通过案例学调优之--OracleTimeModel(时间模型)
  9. 经典排序算法(三)--插入排序Insertion Sort
  10. pojo类中list存储其他字段_如何从其他包含pojo类对象的数组列表中删除数组列表记录...
  11. 如何看懂一个c语言项目,初学者怎样看懂代码 学习代码编程的注意事项
  12. WiFi技术安全调研报告
  13. JS获取浏览器高度和宽度
  14. 使对话框的最大化、最小化和关闭按钮变灰以及对其重载的方法
  15. 深析Vue双向数据绑定(MVVM模型)
  16. windows开机老显示请选择要启动的操作系统
  17. IOS视频编辑,视频裁剪,视频拼接,音频处理,视频处理
  18. 建站的原型图是什么意思?
  19. Android 解决帧动画卡顿问题
  20. 关于买房的后的人生感悟

热门文章

  1. Linux入门+环境搭建云服务器
  2. 小白鼠再排队 TreeMap实现
  3. 关于Centos使用wget下载: 无法解析主机地址问题
  4. 修改SQL Server 2005 数据库文件名字
  5. 一个小巧的C++Log输出到文件类
  6. 数据类型与字节(字符)
  7. [内存管理]linux X86_64处理器的内存布局图(转自:http://blog.csdn.net/younger_china/article/details/16829687)
  8. Python 爬虫 数据提取
  9. 推荐系统在美团综合业务中的应用及实践
  10. vue 中使用 cesium