在前面的两篇博客中

  • RabbitMQ入门:Hello RabbitMQ 代码实例
  • RabbitMQ入门:工作队列(Work Queue)

遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息模型分别为(P代表生产者,C代表消费者,红色代表队列):

这次我们来看下将一个消息发送给多个消费者(工作者),这种模式一般被称为“发布/订阅”模式。其工作模型为(P代表生产者,X代表Exchange(路由器/交换机),C代表消费者,红色代表队列):

我们发现,工作模型中首次出现路由器,并且每个消费者有单独的队列。生产者生成消息后将其发送给路由器,然后路由器转送到队列,消费者各自到自己的队列里面获取消息进行消费。在实际的应用场景中,生产者一般不会直接将消息发送给队列,而是发送给路由器进行中转,Exchange必须清楚的知道怎么处理收到的消息:是将消息发送到一个特定队列还是多有队列,或者直接废弃消息。这种才符合RabbitMQ消息模型的核心思想

接下来我们详细展开今天的话题:

一、Exchange

Exchange在我们的工作模型中首次出现,因此需要详细介绍下。

Exchange分为4种类型:

Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
Headers:我们可以不考虑它。

今天我们的实例采用fanout类型的exchange。

尽管首次出现,但是其实我们前面的案例中也有用到exchange,只是我们没有给他名字,用的是RabbitMQ默认的,比如下面这段代码,我们将路由器名这个参数传入了“”,如果我们需要自己声明exchange的话,这个就不能传入“”了,而是传入自己定义好的值。

二、临时队列

前面两篇博客中,我们都在使用队列的时候给出了定义好的名字,这在生产者和消费者共用相同队列的时候很有必要,但是我们有了exchange,生产者不需要知道有哪些队列,因此队列名字可以不用指定了,而是通过RabbitMQ 接口自己去生成临时队列,队列名字也由RabbitMQ自动生成。通过

可以声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法可以获取随机队列名字。这个名字用来在队列和exchange之间建立binding关系的时候使用:

三、代码实现

基于上面exchange和临时队列的知识铺垫,可以展开今天的代码实现了。

  1. 生产者

    public class Product {//exchange名字public static String EXCHANGE_NAME = "exchange";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = null;Channel channel = null;try {// 1.创建连接和通道connection = factory.newConnection();channel = connection.createChannel();// 2.为通道声明exchange和exchange的类型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String msg = " hello rabbitmq, this is publish/subscribe mode";// 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());System.out.println("product send a msg: " + msg);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 4.关闭连接if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
    }

  2. 消费者1
    public class Consumer1 {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = null;Channel channel = null;try {// 1.创建连接和通道connection = factory.newConnection();channel = connection.createChannel();// 2.为通道声明exchange以及exchange类型
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 3.创建随机名字的队列String queueName = channel.queueDeclare().getQueue();// 4.建立exchange和队列的绑定关系channel.queueBind(queueName, Product.EXCHANGE_NAME, "");System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");// 5.通过回调生成消费者并进行监听Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {// 获取消息内容然后处理String msg = new String(body, "UTF-8");System.out.println("*********** Consumer1" + " get message :[" + msg + "]");}};// 6.消费消息channel.basicConsume(queueName, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
    }

  3. 消费者2,核心代码同消费者1一样,只是在日志打印上将"Consumer1"改为"Consumer2"而已。这里不再列出具体代码。
  4. 先运行消费者1和2,然后运行生产者,观察控制台log打印情况:
    生产者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode消费者1:**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]消费者2:**** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    可以看到,当生产者发出消息后,两个消费者最终都收到了消息。

  5. 我们去查看RabbitMQ管理页面:

    在Exchanges 标签页里面多了一个名为“exchange”的路由器,他的类型是fanout。点exchange 的link进入详细页面:

    发现在binding项目中有了两条绑定关系,队列的名字也可以看到。将页面切换到Queues标签页:

    出现了两个新的队列,队列名字和绑定关系中的一样,并且队列都是自动删除的、通道独占的。

  6. 然后将消费者1和消费者2都停掉,重新查看管理页面,我们发现exchange还在,binding关系不存在了,临时队列也自动删除了

转载于:https://www.cnblogs.com/sam-uncle/p/9208008.html

RabbitMQ入门:发布/订阅(Publish/Subscribe)相关推荐

  1. RabbitMQ教程 3.发布/订阅(Publish/Subscribe)

    搜索:Java课代表,关注公众号,及时获取更多Java干货. 3 发布/订阅(Publish/Subscribe) 在上一节中,我们创建了一个工作队列.其目的是将每个任务只分发给一个worker.本节 ...

  2. RabbitMq 发布订阅 Publish/Subscribe fanout/direct

    目录 概述 交换机 临时队列 代码 概述 在上篇中了解到rabbitmq 生产者生产消息到队列,多个消费者可以接受.这篇文章主要记录广播类型为fanout.生产者不在将产生的消息发送到队列,而是将消息 ...

  3. Redis发布与订阅——PUBLISH SUBSCRIBE

    2019独角兽企业重金招聘Python工程师标准>>> Redis发布与订阅--PUBLISH  & SUBSCRIBE 一般来说,发布与订阅(又称pub/sub)的特点是 ...

  4. RabbitMq之发布订阅模式

    这里写了一个简单的springboot的demo来处理RabbitMq的发布订阅 添加pom依赖 <dependency><groupId>com.rabbitmq</g ...

  5. 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅

    在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...

  6. RabbitMQ 之发布订阅模式

    publish/subscribe 发布订阅模式中,生产者不再直接与队列绑定,而是将数据发送至交换机Exchange 交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用. ...

  7. RabbitMQ——消息发布订阅

    消息订阅发布 一个生产者,多个消费者 每个消费者都有自己的队列 生产者,没有直接把消息发送到队列,而是先发送到交换机 每个队列都要绑定到交换机 生产者发送的消息 经过交换机,到达队列,就能实现一个消息 ...

  8. RabbitMQ系列之三:publish subscribe

    server端代码: 1 package com.example.publishsubscribe; 2 3 import java.io.IOException; 4 5 import com.ra ...

  9. python消息订阅_python rabbitmq消息发布订阅

    发送端:import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='loc ...

  10. Redis的发布订阅模式

    本文源码参看:https://github.com/duktig666/learn-example/tree/5586febea31c2fb368e19fbdba11ed08afd463e0/Redi ...

最新文章

  1. audio type多种类型_http content-type常见文件格式类型
  2. SRIO学习(六)——Direct I/O 操作(一)
  3. linux xargs与管道的区别
  4. java怎么统计字符串中各个字母的个数,人生转折!
  5. mpvue开发微信小程序之picker
  6. 一个base.css
  7. 转: 七牛云的开源播放器的使用指南
  8. android 队列上传图片,话说android端七牛图片上传
  9. 【BZOJ4108】[Wf2015]Catering 有上下界费用流
  10. mysql+case_mysql内置函数case用法介绍
  11. Angularjs进阶笔记(2)—自定义指令中的数据绑定
  12. 2-1 CPU多级缓存-缓存一致性.mkv
  13. 使用electron-builder来打包
  14. yum install php-pecl-mongo,pecl安装php mongodb扩展
  15. 基于onvif协议的嵌入式设备(摄像头)开发(客户端)
  16. 安徽财贸职业学院计算机信息管理怎么样,让大家看看安徽财贸职业学院计算机系的汪永涛辅导员如何对待我的...
  17. 刘强东第二次“二次创业”
  18. 马悦凌:从初级护士到“民间奇医”[2]
  19. python类的魔法方法和装饰器
  20. C语言中printf打印形式(%02X, %2X, %-2X, %.nf, %m.nf, %e, %m.ne, %2d, %-2d, %02d, %.2d)

热门文章

  1. 爱数助力国资委实现混合IT环境下的业务保护
  2. 《FPGA全程进阶---实战演练》第十一章 VGA五彩缤纷
  3. Squid Analyzer 5.1 发布,Squid日志统计
  4. squid日志中关键字的含义
  5. CSLA.Net3.6中使用CodeSmith
  6. Angular Js 判断对象不为空对象的三种方法
  7. L1-005. 考试座位号-PAT团体程序设计天梯赛GPLT
  8. LeetCode 119. Pascal’s Triangle II
  9. in最多可以放多少?_新手开店,放多少商品才能获取最多流量?
  10. python json文件遍历所有key、value 及替换key对于的value