【README】

本文po出 mq的发布订阅模式,及代码示例;

【1】intro

1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;

2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;

5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;

6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;

【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)

生产者


/*** 发布订阅模式生产者* 本文发布订阅模式使用的交换机类型为广播 fanout * @author tang rong */
public class PSProduer {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接Channel channel = conn.createChannel();  // 创建频道/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 创建队列* @param1 队列名称* @param2  是否持久化队列* @param3 是否独占本次连接 * @param4 是否在不使用的时候自动删除队列 * @param5 队列其他参数  */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机 */channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE,    "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE,    "");/*** 发送消息 */long temp = 1; for (int i = 0; i < 1000; i++) { String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();/*** 参数1 交换机名称,没有指定则使用默认交换机 Default change * 参数2 路由key,简单模式可以传递队列名称 * 参数3 消息其他属性 * 参数4 消息内容 */channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); System.out.println("生产者发送消息" + msg);  }  System.out.println("=== 生产者消息发送完成");/* 关闭资源 */channel.close();conn.close(); }
}

消费者1


/*** 发布订阅模式消费者1* @author tang rong */
public class PSConsumer1 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel();  // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_1, true, consumer); }}

消费者2

/*** 发布订阅模式消费者* @author tang rong */
public class PSConsumer2 {/** 交换机类型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 队列名称1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 创建连接 Channel channel = conn.createChannel();  // 创建队列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机/*** 创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 队列绑定交换机*/channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组  */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者2 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者2 end ===\n"); } };/*** 监听消息* 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(FANOUT_QUEUE_2, true, consumer); }}

【3】小结

1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机),  发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;

2)默认交换机

AMQP default

rabbitmq-发布订阅模式相关推荐

  1. RabbitMQ:发布订阅模式

    ✨ RabbitMQ:发布订阅模式 1.订阅模式基本介绍 2.交换机 3.发布订阅模式 3.1基本介绍 3.2生产者 3.3消费者 3.4测试

  2. RabbitMQ发布/订阅模式(Publish/Subscribe)

    工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...

  3. RabbitMQ六种队列模式-发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  4. 【转】RabbitMQ六种队列模式-3.发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  5. RabbitMQ入门学习系列(四) 发布订阅模式

    什么时发布订阅模式 把消息发送给多个订阅者.也就是有多个消费端都完整的接收生产者的消息 换句话说 把消息广播给多个消费者 消息模型的核心 RabbitMQ不发送消息给队列,生产者也不知道消息发送到队列 ...

  6. RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

    在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者.本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式. 为了阐述这个 ...

  7. RabbitMq之发布订阅模式

    这里写了一个简单的springboot的demo来处理RabbitMq的发布订阅 添加pom依赖 <dependency><groupId>com.rabbitmq</g ...

  8. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  9. RabbitMQ 之发布订阅模式

    publish/subscribe 发布订阅模式中,生产者不再直接与队列绑定,而是将数据发送至交换机Exchange 交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用. ...

  10. 【夏目鬼鬼分享】RabbitMQ发布/订阅广播模式

    消息发送流程说明 可以有多个消费者 每个消费者都有自己的队列(queue) 每个队列都要绑定到交换机(Exchange)(都是一些临时队列) 生产者发送的消息只能发送到交换机,交换机来决定要发给那个队 ...

最新文章

  1. Codeforces Round #228 (Div. 1)B
  2. 【Windows系统】基于vscode搭建go语言开发环境
  3. Elasticsearch——Templates 模板
  4. 【c++算法刷题笔记】——洛谷2
  5. java安装找不到uri,【找不到与请求 URI匹配的 HTTP 资源】(转)
  6. java闰年满五换行_CoreJava练习题
  7. html鼠标滑过带音效,HTML5带音效的交互式日食动画
  8. 6. Browser 对象 - Screen 对象(2)
  9. win10正常上网但是网络图标显示无连接,无法开启热点
  10. 三极管工作原理及测定
  11. 通达信股本变迁文件(gbbq)解密方法
  12. js 混合排序(同时存在数字、字母、汉字等)
  13. 工作站 桌面 服务器,图形工作站也虚拟化,立即让你的工作站也可以远程访问
  14. K2 重磅出击,构建财务共享中心方案,促进企业标准化
  15. c语言循环写回合制小游戏_告别黑框框——用C语言Easyx图形库实现图形界面
  16. python提供数字类型包括_Python 语言提供的 3 个基本数字类型是( )_学小易找答案...
  17. 华为od机考真题(JAVA)
  18. 加薪申请表要填这四个方面
  19. 快播画上句号,王欣转身区块链和AI,能否实现王者归来?
  20. win7, gtx750ti 2g tensorflow 安装GPU版本,个人总结,步骤比较详细

热门文章

  1. 【UOJ575】光伏元件【网络流建图】【上下界网络流】【费用流】
  2. acwing 327. 玉米田
  3. 2020牛客国庆集训派对day4 What Goes Up Must Come Down
  4. 牛客每日一题3.31 城市网络 树上倍增
  5. [骗分技巧——随机化Ⅱ] [Poi2014]Couriers,CodeChef - TKCONVEX
  6. P5074-Eat the Trees【插头dp】
  7. P5591-小猪佩奇学数学【单位根反演】
  8. 2020牛客NOIP赛前集训营-提高组(第六场)A-袜子分配【组合数学,结论】
  9. P4827-[国家集训队]Crash 的文明世界【树形dp,换根法,斯特林数】
  10. P3327-[SDOI2015]约数个数和【莫比乌斯反演】