java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列
发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定。交换机会把消息发送到绑定过的所有队列,实现一对多,一条消息被多个消费者消费。
可以看到,这种模型需要用到交换机模块,我们在后台管理界面可以看到许多交换机可供使用,当然,也可以自己声明需要的交换机。
每个虚拟主机默认生成了多个类型的交换机,这里我们选择一个fanout类型的,名为amqp.fanout的交换机测试。
- 生产者
public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();// 定义通道对应的交换机 参数一:交换机名称 参数二:教会及类型 fanout 广播模型channel.exchangeDeclare("amqp.fanout","fanout");// 发送消息channel.basicPublish("amqp.fanout","",null,("fanout message " + System.currentTimeMillis()).getBytes());}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}
}
通过连接获取通道后,使用channel.exchangeDeclare(“amqp.fanout”,“fanout”);声明交换机,如果交换机不存在,rabbitmq会自动创建,发送消息时指定exchange为amqp.fanout,第二个参数为空,使用临时队列。
- 消费者
public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare("amqp.fanout","fanout");//创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定交换机和队列channel.queueBind(queueName,"amqp.fanout","");// 每次只能消费一个消息channel.basicQos(1);// 消费消息channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}
}
此处为了减少篇幅,读者可将Consumer01的代码复制生成Consumer02或多个消费者类
- 测试
启动所有消费者,通过临时队列监听交换机。此时,运行生产者发布消息,多个消费者可同时接收到交换机发送的消息。
消费者一:
消费者二:
多个消费者同时接收到了相同的消息,再运行一次生产者也一样,消息已广播的形式发送至绑定到交换机的多个消费者,其中,消费者与交换机通过临时队列连接。
java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列相关推荐
- java实现rabbitmq路由模型(routing/topic queues), 生产者 消费者 交换机 消息队列
在fanout模型中,一条消息会被所有订阅的队列消费,即绑定了对应交换机的消费者,都能收到消息.但在某些场景下,我们希望不同的消息发送到不同的队列,被不同的消费者消费,此时就要用到Direct类型的交 ...
- java实现rabbitmq动态路由/话题模型(topic queues), 生产者 消费者 交换机 消息队列
在routing路由模型中,我们实现了可以根据routingKey来选择性地将消息发送到对应的消息队列中,但是,这种模型不够灵活,比如最开始只有warn.info.error.三种类型的日志,但后面如 ...
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- RabbitMQ消息队列:发布/订阅(Publish/Subscribe)
2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...
- RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)
在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...
- RabbitMQ发布/订阅模式(Publish/Subscribe)
工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...
- kafka 发布订阅_在Kafka中发布订阅模型
kafka 发布订阅 这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 ...
- 在Kafka中发布订阅模型
这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 : Kafka是一种分 ...
- ros 单向通讯 talker,listener 发布订阅模型
原文链接: ros 单向通讯 talker,listener 发布订阅模型 上一篇: VirtualBox 端口转发(端口映射) 主机和虚拟机相互访问 下一篇: python 串口编程 发布订阅模型 ...
最新文章
- 南京大学「自然指数」超越清华北大,位列全国高校第一、世界第七,突显学术实力...
- uni-app读取html缓存,uni-app同步缓存值 设置 读取 删除(示例代码)
- springmvc教程--整合mybatis开发(spring+springMVC+mybatis整合开发)
- SAP CRM 中间件Request download里,遇到/SAPPSPRO/S_MAT_ENHANC_COMM 错误的解决办法
- angular封装富文本编辑器指令
- js预览本地word文档_怎么免费下载百度文库付费文档?
- 仿网易/QQ空间视频列表滚动连播炫酷效果
- Leetcode 242. 有效的字母异位
- 用户管理系统_河北会计管理系统个人用户 使用手册
- IS-IS详解(十六)——IS-IS 分片扩展
- PO、VO、BO、POJO、DAO、DTO都是什么对象
- Mac如何删除python Python cannot be opened because of a problem
- qpython 3h怎么使用_不思议迷宫M14怎么玩 不思议迷宫M14攻略
- 飘云阁内存补丁工具使用
- 原生JS(JavaScript)
- vue2中监听watch的写法汇总
- 英语四级口语测试软件,2021年大学英语四级口语测试题
- 工业设计公司如何选择
- 朋友圈内容防折叠不折叠小技巧
- k8s-client-go源码剖析(一)