使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配。

  • 生产者代码:
 1 package org.study.exchange3.topic3;2 3 import com.rabbitmq.client.Channel;4 import com.rabbitmq.client.Connection;5 import org.junit.Test;6 import org.study.utils.ConnectionUtils;7 8 import java.io.IOException;9 import java.util.concurrent.TimeoutException;
10
11 /**
12  * topic-主题模式(分发订阅)
13  * exchange只转发消息,但是没有存储能力,只有队列才有存储能力
14  * 主题模式支持路由键的通配符
15  * “#”表示0个或若干个关键字,“*”表示一个关键字。
16  */
17 public class Sender {
18     public static final String QUEUE_NAME = "test_topic_queue";
19     public static final String EXCHANGE_NAME = "topic_exchange";
20
21     @Test
22     public void send() throws IOException, TimeoutException, InterruptedException {
23         // 获取连接
24         Connection conn = ConnectionUtils.getConnection();
25         // 获取通道
26         Channel channel = conn.createChannel();
27 //        //创建队列
28 //        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
29         //声明转发器
30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31         //每个消费者发送确认消息前,只发送一条消息
32         channel.basicQos(1);
33
34         String msg = "hello rabbitmq topic !";
35         //发送消息至转发器,指定路由键
36         channel.basicPublish(EXCHANGE_NAME, "key.key", null, msg.getBytes());
37         System.out.println("[send] msg " + msg);
38
39         channel.close();
40         conn.close();
41     }
42 }
  • 消费者代码:
 1 package org.study.exchange3.topic3;2 3 import com.rabbitmq.client.*;4 import org.junit.Test;5 import org.study.utils.ConnectionUtils;6 7 import java.io.IOException;8 import java.util.concurrent.TimeoutException;9
10 /**
11  * 主题模式-接收消息
12  */
13 public class Recv {
14     public static final String QUEUE_NAME = "test_topic_queue";
15     public static final String EXCHANGE_NAME = "topic_exchange";
16
17     @Test
18     public void recv() throws IOException, TimeoutException, InterruptedException {
19         Connection conn = ConnectionUtils.getConnection();
20         Channel channel = conn.createChannel();
21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22         channel.basicQos(1);
23         /*
24          * 队列绑定转发器,路由键通配符#和*
25          * #:表示0个或多个字符
26          * *:表示一个字符
27          * */
28         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
29
30         //定义消费者
31         DefaultConsumer consumer = new DefaultConsumer(channel) {
32             //重写获取到达消息
33             @Override
34             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
35                 String msg = new String(body, "utf-8");
36                 System.out.println("[1] recv: " + msg);
37
38                 try {
39                     Thread.sleep(100);
40                 } catch (InterruptedException e) {
41                     e.printStackTrace();
42                 } finally {
43                     System.out.println("[1] done!");
44                     // 回执
45                     channel.basicAck(envelope.getDeliveryTag(), false);
46                 }
47             }
48         };
49
50         while (true) {
51             //监听队列
52             channel.basicConsume(QUEUE_NAME, false, consumer);
53             Thread.sleep(1000);
54         }
55
56     }
57 }

Java使用RabbitMQ之订阅分发(Topic)相关推荐

  1. java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列

    发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...

  2. RabbitMQ教程_4 Java 使用rabbitmq

    https://gitee.com/fakerlove/rabbitmq 文章目录 4. Java 使用rabbitmq 4.1 直连模型--Helloword 引入依赖 创建开发生产者 发布成功 建 ...

  3. RabbitMQ教程(四) Java 使用rabbitmq

    https://gitee.com/fakerlove/rabbitmq 文章目录 4. Java 使用rabbitmq 4.1 直连模型--Helloword 引入依赖 创建开发生产者 发布成功 建 ...

  4. java操作RabbitMQ

    1.创建虚拟主机.交换机.队列 RabbitMQ提供了自己的管理界面,可以通过管理界面来完成VirtualHost.Exchange.queue的创建. 1.1创建VirtualHost 1.2创建交 ...

  5. RabbitMQ:订阅模型-消息订阅模式

    订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息. RabbitMQ 单 ...

  6. java启动RabbitMQ消息报异常解决办法

    java启动RabbitMQ消息报异常解决办法 参考文章: (1)java启动RabbitMQ消息报异常解决办法 (2)https://www.cnblogs.com/meilibao/p/11357 ...

  7. java实现rabbitmq任务模型(work queues), 生产者 消费者 消息队列 能者多劳

    work queues也成为task queues,任务模型.当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理.此时,就恶意使用work模型,让多个消 ...

  8. RabbitMQ:发布订阅模式

    ✨ RabbitMQ:发布订阅模式 1.订阅模式基本介绍 2.交换机 3.发布订阅模式 3.1基本介绍 3.2生产者 3.3消费者 3.4测试

  9. RabbitMQ (五) 订阅者模式之分发模式 ( fanout )

    前面讲到了简单队列和工作队列. 这两种队列有个非常明显的缺点 : 生产者发送的消息,只能进入到一个队列. 消息只能进入到一个队列就意味着消息只能被一个消费者消费. 尽管工作队列模式中,一个队列中的消息 ...

最新文章

  1. 2022-2028年中国场景金融行业深度调研及投资前景预测报告
  2. 朋友在小厂待到三十多岁了 现在跳槽能找到什么样的工作
  3. idea test包_6.Flinkx如何在idea中运行?
  4. LeetCode 1878. 矩阵中最大的三个菱形和(模拟)
  5. 搜索互联网缓存页面 How to View the Cached Page of any URL or Website
  6. 如何避免循环中“突兀”的break和continue
  7. SQLSERVER 2012之AlwaysOn -- 一次硬件升级引发的问题
  8. lstm数学推导_LSTM简介以及数学推导(FULL BPTT)
  9. splice方法_JavaScript数组_数组方法【一】(二十六)
  10. Ajax运用json数组传输数据
  11. 如何使用NAS才能确保数据100%安全(数据存储解决方案)
  12. GIMP制作电子签名
  13. Mac新手必备技巧之如何批量修改图片大小
  14. 二手车数据挖掘- 数据分析
  15. java之家_java之家
  16. 最好用的六款虚拟机软件,赶紧收藏
  17. 一文读懂程序化易法易化资频易计利
  18. 爱情八十八课,对抗消磨
  19. 2021-12-28 关于直流空开(MCB)的思考
  20. Ubuntu1804 安装 MySQL5.7

热门文章

  1. Leetcode每日一题:148.sort-list(链表排序)
  2. 面试题简答题——操作系统相关汇总
  3. 自动化测试-selenium启动浏览器
  4. 5、Fiddler如何捕获HTTPS会话
  5. 服务器电源维修哪里便宜,服务器电源维修
  6. python查看系统进程_在Python中获取操作系统的进程信息
  7. linux 版本号 加号,如何去除Linux Kernel版本号后面的加号?
  8. c语言找两个数中的最大值,不用任何比较判断找出两个数中的最大值
  9. python如何判断QQ是否在线?
  10. Vue使用Vditor编辑器