【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)
【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)
- 1. 订阅模式
- 2. 发布与订阅模式说明
- 3. 代码示例
- 3.1 生产者
- 3.2 消费者
- 3.3 测试
- 4. 总结
1. 订阅模式
订阅模式示例图:
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
**Exchange(交换机)只负责转发消息,不具备存储消息的能力,**因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2. 发布与订阅模式说明
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。
3. 代码示例
3.1 生产者
package com.siyi.simple;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机地址;默认为localhostconnectionFactory.setHost("localhost");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为/connectionFactory.setVirtualHost("/siyi");//连接用户名;默认为guestconnectionFactory.setUsername("siyi");//连接密码;默认为guestconnectionFactory.setPassword("siyi");//返回连接return connectionFactory.newConnection();}
}
package com.siyi.ps;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.siyi.simple.ConnectionUtil;public class Producer {//交换机名称static final String FANOUT_EXCHANGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");for(int i=0;i<=10;i++){//发送消息String message = "你好,世界~ 发布订阅模式:"+i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其他属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());System.out.println("已发送消息:"+message);}//关闭资源channel.close();connection.close();}
}
3.2 消费者
消费者1
package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,* mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);}
}
消费者2
package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,* mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);}
}
3.3 测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
4. 总结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。
【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)相关推荐
- RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
2019独角兽企业重金招聘Python工程师标准>>> 发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全 ...
- AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster
Distributed Publish Subscribe in Cluster 基本定义 在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的: AKKA已经提供了相应的实现,集群 ...
- 知方可补不足~SQL2008中的发布与订阅模式~续
上一回介绍了如何在sql2008中建立一个数据库的发布者,今天来说一下如何建立一个订阅者,其实订阅者也是一个数据库,而这个数据库是和发布者的数据结构相同的库,它们之间通过SQL代理进行数据上的同步. ...
- RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 : 1.Work queues 工作队列 2.Publish/Subscribe 发布订阅 3.Routing 路由 4.Topics 通 ...
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- Publish/Subscribe 发布与订阅模式
Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...
- RabbitMQ消息队列:发布/订阅(Publish/Subscribe)
2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...
- redis基础教程 --发布与订阅
redis 发布订阅 redis发布 与订阅是一种信息通信模式,发送者(pub)发送信息,订阅者(sub)接收信息 客户端订阅消息 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 ...
最新文章
- 近十年和近三年智能车竞赛国赛奖项在各赛区的分布分析
- 安卓开源项目周报0215
- jremind V0.1.3.0添加透明
- WebStorm 格式化代码 - 快捷键
- 本地下载mysql数据库_本地Windows上安装 MySQL数据库
- 苹果xsmax怎么开机_苹果iPhone12怎么关机和开机 iPhone12开关机方式快捷键
- 补充总结:现代控制理论
- Android APK反编译就这么简单 详解(附图)
- 阿里云 CentOS 7.4 下部署基于 Node.js 的微信小程序商城
- tp5阿里云短信验证码
- pwnable.kr之mistake
- c语言在电路设计作用,ds1307怎么使用(ds1307引脚图及功能_c语言程序及典型应用电路)...
- 2022届浙江工业大学考研计算机技术专硕上岸经验 初试复试经验
- royer推挽自激电路
- windows word2010 PPT
- 三年级计算机绘画第二课堂教案,美术第二课堂计划讲解.docx
- 论文中文翻译——Vulnerability Dataset Construction Methods Applied To Vulnerability Detection A Survey
- jsp调试java_[求助]jsp+javaBean调试环境问题!
- R语言 | 关联规则
- 16 部必看 AI 电影以及AI 电影脱颖而出的原因
热门文章
- 订单数额总数实现_纺织业行情久违火爆?大量印度订单转到中国!
- 实现凹凸多边形外扩与内缩, 无内缩外扩混乱问题
- 反欺诈的核心是人,教你如何用知识图谱识别欺诈行为
- Mac安装Hadoop(超级无敌宇宙爆炸详细)
- 百度地图语音播报集成
- 家庭整理-《怦然心动的人生整理魔法》书中的精髓:如何利用怦然心动家庭整理法,拥有整洁、高效、全新的人生?
- 传统企业为什么要开发H5?
- [深度学习论文笔记DoDNet: Learning to segment multi-organ and tumors from multiple partially labeled datasets
- Git 里面的 origin 到底代表啥意思?【转】
- [生存志] 第69节 孙武吴宫授兵法