【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)

  • 1. 订阅模式
  • 2. 发布与订阅模式说明
  • 3. 代码示例
    • 3.1 生产者
    • 3.2 消费者
    • 3.3 测试
  • 4. 总结

1. 订阅模式

订阅模式示例图:

前面2个案例中,只有3个角色:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

**Exchange(交换机)只负责转发消息,不具备存储消息的能力,**因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

2. 发布与订阅模式说明


发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。

3. 代码示例

3.1 生产者

package com.siyi.simple;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机地址;默认为localhostconnectionFactory.setHost("localhost");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为/connectionFactory.setVirtualHost("/siyi");//连接用户名;默认为guestconnectionFactory.setUsername("siyi");//连接密码;默认为guestconnectionFactory.setPassword("siyi");//返回连接return connectionFactory.newConnection();}
}
package com.siyi.ps;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.siyi.simple.ConnectionUtil;public class Producer {//交换机名称static final String FANOUT_EXCHANGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");for(int i=0;i<=10;i++){//发送消息String message = "你好,世界~ 发布订阅模式:"+i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其他属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());System.out.println("已发送消息:"+message);}//关闭资源channel.close();connection.close();}
}

3.2 消费者

消费者1

package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,*       mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);}
}

消费者2

package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,*       mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);}
}

3.3 测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

4. 总结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。

【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)相关推荐

  1. RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...

  2. RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全 ...

  3. AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster

    Distributed Publish Subscribe in Cluster 基本定义 在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的: AKKA已经提供了相应的实现,集群 ...

  4. 知方可补不足~SQL2008中的发布与订阅模式~续

    上一回介绍了如何在sql2008中建立一个数据库的发布者,今天来说一下如何建立一个订阅者,其实订阅者也是一个数据库,而这个数据库是和发布者的数据结构相同的库,它们之间通过SQL代理进行数据上的同步. ...

  5. RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

  6. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  7. Publish/Subscribe 发布与订阅模式

    Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...

  8. RabbitMQ消息队列:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...

  9. redis基础教程 --发布与订阅

    redis 发布订阅 redis发布 与订阅是一种信息通信模式,发送者(pub)发送信息,订阅者(sub)接收信息 客户端订阅消息 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 ...

最新文章

  1. 近十年和近三年智能车竞赛国赛奖项在各赛区的分布分析
  2. 安卓开源项目周报0215
  3. jremind V0.1.3.0添加透明
  4. WebStorm 格式化代码 - 快捷键
  5. 本地下载mysql数据库_本地Windows上安装 MySQL数据库
  6. 苹果xsmax怎么开机_苹果iPhone12怎么关机和开机 iPhone12开关机方式快捷键
  7. 补充总结:现代控制理论
  8. Android APK反编译就这么简单 详解(附图)
  9. 阿里云 CentOS 7.4 下部署基于 Node.js 的微信小程序商城
  10. tp5阿里云短信验证码
  11. pwnable.kr之mistake
  12. c语言在电路设计作用,ds1307怎么使用(ds1307引脚图及功能_c语言程序及典型应用电路)...
  13. 2022届浙江工业大学考研计算机技术专硕上岸经验 初试复试经验
  14. royer推挽自激电路
  15. windows word2010 PPT
  16. 三年级计算机绘画第二课堂教案,美术第二课堂计划讲解.docx
  17. 论文中文翻译——Vulnerability Dataset Construction Methods Applied To Vulnerability Detection A Survey
  18. jsp调试java_[求助]jsp+javaBean调试环境问题!
  19. R语言 | 关联规则
  20. 16 部必看 AI 电影以及AI 电影脱颖而出的原因

热门文章

  1. 订单数额总数实现_纺织业行情久违火爆?大量印度订单转到中国!
  2. 实现凹凸多边形外扩与内缩, 无内缩外扩混乱问题
  3. 反欺诈的核心是人,教你如何用知识图谱识别欺诈行为
  4. Mac安装Hadoop(超级无敌宇宙爆炸详细)
  5. 百度地图语音播报集成
  6. 家庭整理-《怦然心动的人生整理魔法》书中的精髓:如何利用怦然心动家庭整理法,拥有整洁、高效、全新的人生?
  7. 传统企业为什么要开发H5?
  8. [深度学习论文笔记DoDNet: Learning to segment multi-organ and tumors from multiple partially labeled datasets
  9. Git 里面的 origin 到底代表啥意思?【转】
  10. [生存志] 第69节 孙武吴宫授兵法