目录

  • 订阅发布模式

    • 1、交换器(Exchange)

      • 1.1、创建交换器
      • 1.2 、推送消息到交换器
    • 2、临时队列
    • 3、绑定(bingdings)
    • 5、代码例子
      • 5.1、生产者代码示例
      • 5.2、消费者代码示例

订阅发布模式

1、交换器(Exchange)

Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)

RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。

其实生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义

1.1、创建交换器

有一些可用的exchange类型:direct, topic, headersfanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanoutexchange:

channel.exchangeDeclare("logs", "fanout");

fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中。

  • 没有名字的exchange
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

如上面的代码我们没有指定exchagne的名字,采用的是“”,空字符串的符号指的是默认的或没有命名的exchange:消息会根据routingKey被路由到指定的消息队列中

// 申明交换器,第一个参数:交换器的名字;第二个参数:交换器的类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

1.2 、推送消息到交换器

现在我们来把消息推送到已命名的exchange上,原来的做法是推送到默认的交换器上面的;

  • 原来的做法
// 第一个参数:交换器的名称
// 第二个参数:队列名称
// 第三个参数:消息的属性
// 第四个参数:消息体channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
  • 推送到交换器

// 第一个参数:交换器名称;
// 第二个参数:队列名称;
// 第三个参数:消息属性;
// 第四个参数:消息体
channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());

2、临时队列

之前的例子中,应该会发现我们都是使用了一个指定名字的消息队列。对应的生产者和消费者之间都要使用相同的消息队列名称

但是在我们的log系统中却不是这样,我们希望能够接收到所有的log消息,不只是其中的一部分。我们只要处理当前的log消息,不用管过去的历史log。为了实现,我们需要做以下两步:

  • 无论什么时候我们和RabbitMQ建立连接时,我们都要刷新、清空Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建Queue,也可以让服务器来自动建立一个随见的Queue
  • 当消费者断开连接时,Queue能自动被删除。

使用java客户端时,我们使用无参数的queueDeclare方法,就可以创建一个已经生成名字的排他性会自动删除Queue

String queueName = channel.queueDeclare().getQueue();

这里面我们就可以拿到一个随机名字的queue,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

3、绑定(bingdings)

现在已经创建好了一个fanout类型的exchange和一个队列。那么接下来我们就需要让exchange向我们的queue里发送消息,Exchangequeue之间的关系就是绑定(bindings

    channel.queueBind(queueName,exchangeName,"");

5、代码例子

现在的代码和之前的区别不是很大;

主要的区别就是:

  • 我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange
  • 在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略

5.1、生产者代码示例


/*** @author zhaodi* @description* @date 2018/9/28 16:50*/
public class Producer {private static final String EXCHANGE_NAME = "my-exchange-1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明交换器,// 第一个参数:交换器的名字;// 第二个参数:交换器的类型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 第一个参数:交换器名称;// 第二个参数:队列名称;// 第三个参数:消息属性;// 第四个参数:消息体channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());channel.close();connection.close();
}

正如你所见,在建立连接后我们声明了exchange。这一步是必须的,因为禁止向一个不存在的exchange推送消息。

如果没有对exchange负责的queue,那么消息将会被丢失,这是没有问题的;如果没有消费者监听的话,我们会安全的丢掉这些消息。

5.2、消费者代码示例


/*** @author zhaodi* @desc 发布订阅模式*/
public class Consumer {private static final String EXCHANGE_NAME="my-exchange-1";public static void main(String[] args) throws IOException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明消息路由的名称和类型channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 申明一个随机的消息队列名称String queueName = channel.queueDeclare().getQueue();// 绑定消息路由和消息队列channel.queueBind(queueName,EXCHANGE_NAME,"");// 创建消费者com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("c1--->:"+new String(body));// 手动应答// 第一个参数:消息标志channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听,关闭自动应答boolean autoAck = false;channel.basicConsume(queueName,autoAck,consumer);}
}

转载于:https://www.cnblogs.com/zhaod/p/11391258.html

4 交换机-fanout(订阅发布模式)相关推荐

  1. RabbitMQ下的生产消费者模式与订阅发布模式

    所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入    假 ...

  2. Java设计模式-观察者模式(订阅发布模式)

    Java设计模式-观察者模式(订阅发布模式) 一起来看 会了就当复习丫,不会来一起来看看吧. 很喜欢一句话:"八小时内谋生活,八小时外谋发展". 如果你也喜欢,让我们一起坚持吧!! ...

  3. Publisher/Subscriber 订阅-发布模式原理解析

    Publisher/Subscriber 订阅-发布模式原理解析 参考资料 What Is Pub/Sub? Publish/Subscribe Messaging Explained 什么是serv ...

  4. 理解并实现 你自己的 订阅-发布模式

    订阅发布模式: 这是一种广泛应用于异步编程的模式,是回调函数的事件化,常常用来解耦业务逻辑.事件的发布者无需关注订阅的侦听器如何实现业务逻辑,甚至不用关注有多少个侦听器存在.数据通过消息的方式可以灵活 ...

  5. Redis实现消息队列和订阅发布模式

    转载:https://www.cnblogs.com/qlqwjy/p/9763754.html 在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面 ...

  6. 嵌入式消息订阅发布模式软件框架

    文章目录 一.总体框架 二.基于RT-Thread的SoftBus 2.1 SoftBus的由来 2.2 消息订阅者模式 2.3 静态订阅关系与动态订阅关系 2.4 C/S模式 2.5 消息订阅者模式 ...

  7. java订阅发布模式_Spring Boot ActiveMQ发布/订阅消息模式原理解析

    本文在<Spring Boot基于Active MQ实现整合JMS>的基础上,介绍如何使用ActiveMQ的发布/订阅消息模式.发布/订阅消息模式是消息发送者发送消息到主题(topic), ...

  8. js设计模式之观察者模式和订阅发布模式

    观察者模式 这个模式在我看来原理就是发布者身上放着一个电话本(list)存着订阅者的l联系方式(回调函数),在触发条件(发布)后就会依次联系(遍历调用list中的回调函数)订阅者 上代码: funct ...

  9. 报纸的配送方式:订阅发布模式

    这种设计模式比较简单,属于一对多.类似顾客与报社的关系,顾客订阅报纸,每当有新闻发布时快递员一一给订阅报纸的顾客配送 下面是代码实现 两个接口 /*** 被观察者** Created by Vola ...

最新文章

  1. 浅谈分布式计算的开发与实现(一)
  2. 您试图从目录中执行CGI、ISAPI 或其他可执行程序,但该目录不允许执行程序
  3. css实现多行文字溢出隐藏——前端小问题不定时更新
  4. C++内存管理与分配方式
  5. 1.7 开发集和测试集的大小-深度学习第三课《结构化机器学习项目》-Stanford吴恩达教授
  6. Matplotlib(三) rcParams 自定义样式控制
  7. 常用JQuery插件整理
  8. 2021年3月15日_读书|总结笔记目录
  9. jQuery LigerUI 插件介绍及使用之ligerDrag和ligerResizable
  10. 程序执行的过程分析--【sky原创】
  11. 2.并发编程--线程基础
  12. 【优化算法】阿基米德优化算法(AOA)【含Matlab源码 1447期】
  13. 计算机科学导论简答题答案题库,计算机科学导论习题答案
  14. 【discuzx2】如何通过工具修改ucenter创始人的管理员密码以保证通信成功?
  15. 工作说明书(SOW)
  16. 全网最简单Win10桌面美化教程,只需4步!!
  17. 未转变者服务器bug,未转变者攻略 unturned无敌BUG说明
  18. 基于python3的tkinter和scapy可视化报文构造工具(六)
  19. MII接口(Media Independent Interface)
  20. web前端开发和岗位职责

热门文章

  1. adb logcat介绍
  2. linux下的nginx+php+mysql
  3. 【ES6基础】Object的新方法
  4. Spring框架初写
  5. 乙肝的传播途径是否只有四种?
  6. 到底什么是rest客户端
  7. linux下面使用飞书个人版预览doc
  8. 螺丝孔槽中的螺丝拧花了的物理原理分析
  9. RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
  10. 菜鸟教程-HTML 教程学习笔记