订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

RabbitMQ 单生产单消费模型主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录

  • 一、RabbitMQ 订阅模型-消息订阅(Fanout)模式
    • 1、RabbitMQ 消息订阅(Fanout)模式
    • 2、消息订阅(Fanout)模式组成
    • 3、消息订阅(Fanout)模式流程
  • 二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现
    • 1、添加 Maven 依赖
    • 2、封装工具类 ConnectionUtil
    • 3、生产者实现
    • 4、消费者-1 实现
    • 5、消费者-2 实现
    • 6、消费者-3 实现
  • 三、订阅模型 三种模式区别
    • 1、RabbitMQ 消息订阅(Fanout)模式
    • 2、RabbitMQ 路由(direct)模式
    • 3、RabbitMQ 主题(topic)模式

一、RabbitMQ 订阅模型-消息订阅(Fanout)模式

1、RabbitMQ 消息订阅(Fanout)模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

2、消息订阅(Fanout)模式组成

RabbitMQ 订阅模型-消息订阅(Fanout)模式主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

3、消息订阅(Fanout)模式流程

消息订阅(Fanout)模式流程:

  1. 消息订阅(Fanout)模式 可以有多个消费者
  2. 每个消费者有自己的 queue(队列)
  3. 每个队列都要绑定到 Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给那个队列,生产者无法决定
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency>...
</dependencies>

2、封装工具类 ConnectionUtil

package com.lizhengi.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author liziheng* @version 1.0.0* @description 连接工具类封装* @date 2022-12-26 11:39 上午**/
public class ConnectionUtil {/*** 创建 MQ 连接工厂对象*/private static final ConnectionFactory CONNECTION_FACTORY;// 类加载时,重量级资源加载static {CONNECTION_FACTORY = new ConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/*** 建立与RabbitMQ连接 工具方法** @return Connection*/public static Connection getConnection() {try {//返回连接对象return CONNECTION_FACTORY.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 关闭通道和连接 工具方法** @param channel    Channel* @param connection Connection*/public static void closeConnectionAndChanel(Channel channel, Connection connection) {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}
}

3、生产者实现

package com.lizhengi.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 生产者* @date 2022-12-26 11:44 上午**/
public class Producer {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();/*  为通道声明交换机*  参数1:交换机名称;参数2:交换机类型*/channel.exchangeDeclare("logs", "fanout");//发送消息for (int i = 0; i < 100; i++) {channel.basicPublish("logs", "", null, ("fanout type message" + i).getBytes());}//释放资源ConnectionUtil.closeConnectionAndChanel(channel, connection);}
}

4、消费者-1 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer1 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者1:" + new String(body));}});}
}

5、消费者-2 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer2 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者2:" + new String(body));}});}
}

6、消费者-3 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer3 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者3:" + new String(body));}});}
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

RabbitMQ:订阅模型-消息订阅模式相关推荐

  1. .NetCore Cap 结合 RabbitMQ 实现消息订阅

    开源分布式消息框架 Cap 可以在GitHub上拉也可以通过nuget添加 上一篇博文写了 Windows RabbitMQ的安装使用 Cap支持事务,通过捕获数据库上下文连接对象实现 消息事务,消息 ...

  2. 深入biztalk消息以及消息订阅发布路由机制(二)-消息订阅【转】

    一.消息订阅 订阅消息的主体叫订阅服务器,订阅服务器是可以订阅并消费消息的服务,可以作为订阅服务器的服务类型目前有四类,在BizTalkMgmtDb管理数据库中的adm_ServiceClass的Na ...

  3. java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列

    发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...

  4. RabbitMQ下的生产消费者模式与订阅发布模式

    所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入    假 ...

  5. java基础巩固-宇宙第一AiYWM:为了维持生计,Redis基础Part6(Redis的应用场景、Redis是单线程的速度还快、Redis线程模型:Reactor模式、事件、发布订阅、管道)~整起

    PART1-1:为什么Redis是单线程的 Redis单线程是指: Redis的网络IO和键值对读写是由一个线程来完成的.这也是 Redis 对外提供键值存储服务的主要流程.Redis的其他功能,比如 ...

  6. 消息队列模式(点对点发布订阅)

    Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...

  7. 【EventBus】事件通信框架 ( 实现几个关键的封装类 | 消息中心 | 订阅注解 | 订阅方法封装 | 订阅对象-方法封装 | 线程模式 )

    文章目录 一.消息中心 二.订阅方法时的注解 三.订阅方法封装 四.订阅对象-方法封装 五.线程模式 一.消息中心 此处暂时只实现一个单例类 , 后续 注册订阅者 , 处理事件传递 , 取消注册订阅者 ...

  8. RabbitMQ消息订阅与轮询

    使用向队列注册消费者,当RabbitMQ服务端的队列接收到消息后推送给客户端,这种方式为消息订阅模式.  RabbitMQ客户端也可通过主动查询的方式,从服务端获取消息.使用主动查询的消费者示例如下. ...

  9. 嵌入式消息订阅发布模式软件框架

    文章目录 一.总体框架 二.基于RT-Thread的SoftBus 2.1 SoftBus的由来 2.2 消息订阅者模式 2.3 静态订阅关系与动态订阅关系 2.4 C/S模式 2.5 消息订阅者模式 ...

最新文章

  1. matlab 基础知识class lt; superclass_name
  2. adb 由于目标计算机积极拒绝,无法连接
  3. matlab画一个局部放大的图中图
  4. CRMEB v4二开文档
  5. 一个物理学家学习Windows编程的感受
  6. c++ linux下读取指定目录的所有文件名字
  7. P6085-[JSOI2013]吃货JYY【状压dp,欧拉回路】
  8. 怎么画韦布尔分布_手机按键寿命测试的样本数量怎么定?一文看懂 简述威布尔分布及其应用...
  9. Vue + Spring Boot 项目实战(二):使用 CLI 搭建 Vue.js 项目
  10. 用户权限 英文_伴鱼绘本终身VIP卡+自然拼读精品课,完课返461元!英文原版绘本...
  11. 聊一聊 java8 中的 Optional
  12. 金蝶k3财务接口_记录用友T+接口对接的心酸历程
  13. 软件项目开发报价指南
  14. STP协议:生成树协议(二层防环机制:防止网桥网络中冗余链路形成环路工作)
  15. 百旺税控服务器维护,税控盘是百旺的,但航天信息发信息要交维护费,是什么意思?...
  16. 申请苹果个人开发者经历
  17. MacOS上符号执行模块angr和z3-solver模块的安装
  18. Webservice CXF 调用报错 Could not find conduit initiator for address:
  19. 东北大学秦皇岛分校通信工程中外合作2020级C语言实验3
  20. 贵州兴义电大学计算机培训,黔西南州机电职业技术学校

热门文章

  1. VB编程:SelectCase多分支选择结构判断成绩优良-11_彭世瑜_新浪博客
  2. linux dup作用,dup 与 dup2 的作用
  3. 【扩展lucas】LOJ#2023. 「AHOI / HNOI2017」抛硬币
  4. 程序员电脑最少得16G才够用啊!
  5. 多用户网上购物系统主要功能_免费搭建的流程步骤_OctShop
  6. enclosing type java_No enclosing instance of type 异常详解
  7. 自己的java框架_手把手教你如何设计一个简单的Java框架
  8. 频率学派和贝叶斯学派区别浅谈
  9. matlab整除怎么,matlab整除取整
  10. 微信小程序使用地图map (详细)