Java使用RabbitMQ之订阅分发(Topic)
使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配。
- 生产者代码:
1 package org.study.exchange3.topic3;2 3 import com.rabbitmq.client.Channel;4 import com.rabbitmq.client.Connection;5 import org.junit.Test;6 import org.study.utils.ConnectionUtils;7 8 import java.io.IOException;9 import java.util.concurrent.TimeoutException;
10
11 /**
12 * topic-主题模式(分发订阅)
13 * exchange只转发消息,但是没有存储能力,只有队列才有存储能力
14 * 主题模式支持路由键的通配符
15 * “#”表示0个或若干个关键字,“*”表示一个关键字。
16 */
17 public class Sender {
18 public static final String QUEUE_NAME = "test_topic_queue";
19 public static final String EXCHANGE_NAME = "topic_exchange";
20
21 @Test
22 public void send() throws IOException, TimeoutException, InterruptedException {
23 // 获取连接
24 Connection conn = ConnectionUtils.getConnection();
25 // 获取通道
26 Channel channel = conn.createChannel();
27 // //创建队列
28 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
29 //声明转发器
30 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31 //每个消费者发送确认消息前,只发送一条消息
32 channel.basicQos(1);
33
34 String msg = "hello rabbitmq topic !";
35 //发送消息至转发器,指定路由键
36 channel.basicPublish(EXCHANGE_NAME, "key.key", null, msg.getBytes());
37 System.out.println("[send] msg " + msg);
38
39 channel.close();
40 conn.close();
41 }
42 }
- 消费者代码:
1 package org.study.exchange3.topic3;2 3 import com.rabbitmq.client.*;4 import org.junit.Test;5 import org.study.utils.ConnectionUtils;6 7 import java.io.IOException;8 import java.util.concurrent.TimeoutException;9
10 /**
11 * 主题模式-接收消息
12 */
13 public class Recv {
14 public static final String QUEUE_NAME = "test_topic_queue";
15 public static final String EXCHANGE_NAME = "topic_exchange";
16
17 @Test
18 public void recv() throws IOException, TimeoutException, InterruptedException {
19 Connection conn = ConnectionUtils.getConnection();
20 Channel channel = conn.createChannel();
21 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22 channel.basicQos(1);
23 /*
24 * 队列绑定转发器,路由键通配符#和*
25 * #:表示0个或多个字符
26 * *:表示一个字符
27 * */
28 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
29
30 //定义消费者
31 DefaultConsumer consumer = new DefaultConsumer(channel) {
32 //重写获取到达消息
33 @Override
34 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
35 String msg = new String(body, "utf-8");
36 System.out.println("[1] recv: " + msg);
37
38 try {
39 Thread.sleep(100);
40 } catch (InterruptedException e) {
41 e.printStackTrace();
42 } finally {
43 System.out.println("[1] done!");
44 // 回执
45 channel.basicAck(envelope.getDeliveryTag(), false);
46 }
47 }
48 };
49
50 while (true) {
51 //监听队列
52 channel.basicConsume(QUEUE_NAME, false, consumer);
53 Thread.sleep(1000);
54 }
55
56 }
57 }
Java使用RabbitMQ之订阅分发(Topic)相关推荐
- java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列
发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...
- RabbitMQ教程_4 Java 使用rabbitmq
https://gitee.com/fakerlove/rabbitmq 文章目录 4. Java 使用rabbitmq 4.1 直连模型--Helloword 引入依赖 创建开发生产者 发布成功 建 ...
- RabbitMQ教程(四) Java 使用rabbitmq
https://gitee.com/fakerlove/rabbitmq 文章目录 4. Java 使用rabbitmq 4.1 直连模型--Helloword 引入依赖 创建开发生产者 发布成功 建 ...
- java操作RabbitMQ
1.创建虚拟主机.交换机.队列 RabbitMQ提供了自己的管理界面,可以通过管理界面来完成VirtualHost.Exchange.queue的创建. 1.1创建VirtualHost 1.2创建交 ...
- RabbitMQ:订阅模型-消息订阅模式
订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息. RabbitMQ 单 ...
- java启动RabbitMQ消息报异常解决办法
java启动RabbitMQ消息报异常解决办法 参考文章: (1)java启动RabbitMQ消息报异常解决办法 (2)https://www.cnblogs.com/meilibao/p/11357 ...
- java实现rabbitmq任务模型(work queues), 生产者 消费者 消息队列 能者多劳
work queues也成为task queues,任务模型.当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理.此时,就恶意使用work模型,让多个消 ...
- RabbitMQ:发布订阅模式
✨ RabbitMQ:发布订阅模式 1.订阅模式基本介绍 2.交换机 3.发布订阅模式 3.1基本介绍 3.2生产者 3.3消费者 3.4测试
- RabbitMQ (五) 订阅者模式之分发模式 ( fanout )
前面讲到了简单队列和工作队列. 这两种队列有个非常明显的缺点 : 生产者发送的消息,只能进入到一个队列. 消息只能进入到一个队列就意味着消息只能被一个消费者消费. 尽管工作队列模式中,一个队列中的消息 ...
最新文章
- 2022-2028年中国场景金融行业深度调研及投资前景预测报告
- 朋友在小厂待到三十多岁了 现在跳槽能找到什么样的工作
- idea test包_6.Flinkx如何在idea中运行?
- LeetCode 1878. 矩阵中最大的三个菱形和(模拟)
- 搜索互联网缓存页面 How to View the Cached Page of any URL or Website
- 如何避免循环中“突兀”的break和continue
- SQLSERVER 2012之AlwaysOn -- 一次硬件升级引发的问题
- lstm数学推导_LSTM简介以及数学推导(FULL BPTT)
- splice方法_JavaScript数组_数组方法【一】(二十六)
- Ajax运用json数组传输数据
- 如何使用NAS才能确保数据100%安全(数据存储解决方案)
- GIMP制作电子签名
- Mac新手必备技巧之如何批量修改图片大小
- 二手车数据挖掘- 数据分析
- java之家_java之家
- 最好用的六款虚拟机软件,赶紧收藏
- 一文读懂程序化易法易化资频易计利
- 爱情八十八课,对抗消磨
- 2021-12-28 关于直流空开(MCB)的思考
- Ubuntu1804 安装 MySQL5.7