发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定。交换机会把消息发送到绑定过的所有队列,实现一对多,一条消息被多个消费者消费。


可以看到,这种模型需要用到交换机模块,我们在后台管理界面可以看到许多交换机可供使用,当然,也可以自己声明需要的交换机。
每个虚拟主机默认生成了多个类型的交换机,这里我们选择一个fanout类型的,名为amqp.fanout的交换机测试。

  1. 生产者
public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();// 定义通道对应的交换机 参数一:交换机名称 参数二:教会及类型 fanout 广播模型channel.exchangeDeclare("amqp.fanout","fanout");// 发送消息channel.basicPublish("amqp.fanout","",null,("fanout message " + System.currentTimeMillis()).getBytes());}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}
}

通过连接获取通道后,使用channel.exchangeDeclare(“amqp.fanout”,“fanout”);声明交换机,如果交换机不存在,rabbitmq会自动创建,发送消息时指定exchange为amqp.fanout,第二个参数为空,使用临时队列。

  1. 消费者
public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare("amqp.fanout","fanout");//创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定交换机和队列channel.queueBind(queueName,"amqp.fanout","");// 每次只能消费一个消息channel.basicQos(1);// 消费消息channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息  参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}
}

此处为了减少篇幅,读者可将Consumer01的代码复制生成Consumer02或多个消费者类

  1. 测试
    启动所有消费者,通过临时队列监听交换机。此时,运行生产者发布消息,多个消费者可同时接收到交换机发送的消息。
    消费者一:
    消费者二:
    多个消费者同时接收到了相同的消息,再运行一次生产者也一样,消息已广播的形式发送至绑定到交换机的多个消费者,其中,消费者与交换机通过临时队列连接。

java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列相关推荐

  1. java实现rabbitmq路由模型(routing/topic queues), 生产者 消费者 交换机 消息队列

    在fanout模型中,一条消息会被所有订阅的队列消费,即绑定了对应交换机的消费者,都能收到消息.但在某些场景下,我们希望不同的消息发送到不同的队列,被不同的消费者消费,此时就要用到Direct类型的交 ...

  2. java实现rabbitmq动态路由/话题模型(topic queues), 生产者 消费者 交换机 消息队列

    在routing路由模型中,我们实现了可以根据routingKey来选择性地将消息发送到对应的消息队列中,但是,这种模型不够灵活,比如最开始只有warn.info.error.三种类型的日志,但后面如 ...

  3. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  4. RabbitMQ消息队列:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...

  5. RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)

    在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...

  6. RabbitMQ发布/订阅模式(Publish/Subscribe)

    工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...

  7. kafka 发布订阅_在Kafka中发布订阅模型

    kafka 发布订阅 这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 ...

  8. 在Kafka中发布订阅模型

    这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 : Kafka是一种分 ...

  9. ros 单向通讯 talker,listener 发布订阅模型

    原文链接: ros 单向通讯 talker,listener 发布订阅模型 上一篇: VirtualBox 端口转发(端口映射) 主机和虚拟机相互访问 下一篇: python 串口编程 发布订阅模型 ...

最新文章

  1. 南京大学「自然指数」超越清华北大,位列全国高校第一、世界第七,突显学术实力...
  2. uni-app读取html缓存,uni-app同步缓存值 设置 读取 删除(示例代码)
  3. springmvc教程--整合mybatis开发(spring+springMVC+mybatis整合开发)
  4. SAP CRM 中间件Request download里,遇到/SAPPSPRO/S_MAT_ENHANC_COMM 错误的解决办法
  5. angular封装富文本编辑器指令
  6. js预览本地word文档_怎么免费下载百度文库付费文档?
  7. 仿网易/QQ空间视频列表滚动连播炫酷效果
  8. Leetcode 242. 有效的字母异位
  9. 用户管理系统_河北会计管理系统个人用户 使用手册
  10. IS-IS详解(十六)——IS-IS 分片扩展
  11. PO、VO、BO、POJO、DAO、DTO都是什么对象
  12. Mac如何删除python Python cannot be opened because of a problem
  13. qpython 3h怎么使用_不思议迷宫M14怎么玩 不思议迷宫M14攻略
  14. 飘云阁内存补丁工具使用
  15. 原生JS(JavaScript)
  16. vue2中监听watch的写法汇总
  17. 英语四级口语测试软件,2021年大学英语四级口语测试题
  18. 工业设计公司如何选择
  19. 朋友圈内容防折叠不折叠小技巧
  20. k8s-client-go源码剖析(一)

热门文章

  1. 在Python中将列表转换为元组
  2. xp升级windows7_微软大升级!Windows系统电脑将告别杀毒软件
  3. MTK:文件操作接口详解
  4. opencv之求轮廓的凸包
  5. Open3d之网格变形
  6. ubuntu16.04之升级python3.5到3.6
  7. CentOS设置ssh密钥登录
  8. 从Discuz迁移帐号密码到NodeBB
  9. django搭建一个小型的服务器运维网站-查看和修改服务器配置与数据库的路由...
  10. python中decode()函数函数的用法