RabbitMQ系列之三:publish subscribe
server端代码:
1 package com.example.publishsubscribe; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 public class Send { 10 11 // 定义一个logs的邮局,对消息进行分发处理 12 private static final String EXCHANGE_NAME = "logs"; 13 14 public static void main(String[] args) throws IOException { 15 16 ConnectionFactory factory = new ConnectionFactory(); 17 18 //远程服务器ip,如果在本地测试可以改成localhost 19 factory.setHost("121.40.151.120"); 20 21 //不是在本地测试,用户名和密码必填 22 factory.setUsername("rabbitmqtest"); 23 factory.setPassword("rabbitmqtest"); 24 25 Connection conn = factory.newConnection(); 26 Channel channel = conn.createChannel(); 27 28 /** 29 * 参数说明: 30 * exchange:邮局名称 31 * type: 32 * direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息。 33 * 它会处理routingKey,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 34 * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发, 35 * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 36 * topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。 37 * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。 38 * 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。 39 * fanout: 所有bind到此exchange的queue都可以接收消息。 40 * 不会处理routingKey。你只需要简单的将队列绑定到交换机上。 41 * 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。 42 * 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 43 * 44 */ 45 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 46 47 String message = getMessage(new String[] { "test" }); 48 49 /** 50 * 参数说明: 51 * exchange:默认的exchange就是"",是direct类型的, 52 * 任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。 53 * routingKey:因为这里的exchange类型是fanout的,所以会匹配所有的routingKey。 54 * props:其它属性,比如消息路由头信息,持久化信息 55 * body:消息内容 56 */ 57 channel.basicPublish(EXCHANGE_NAME, "anyRoutingKey", null, message.getBytes()); 58 59 System.out.println("send[" + message + "]"); 60 61 // 最后,我们关闭channel和连接,释放资源。 62 channel.close(); 63 conn.close(); 64 } 65 66 private static String getMessage(String[] strings) { 67 if (strings.length < 1) { 68 return "Hello World!"; 69 } 70 return joinStrings(strings, " "); 71 } 72 73 private static String joinStrings(String[] strings, String delimiter) { 74 int length = strings.length; 75 if (length == 0) { 76 return ""; 77 } 78 StringBuilder words = new StringBuilder(strings[0]); 79 for (int i = 1; i < length; i++) { 80 words.append(delimiter).append(strings[i]); 81 } 82 return words.toString(); 83 } 84 85 }
client端代码:
1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.ShutdownSignalException; 11 12 public class Recv { 13 14 private static final String EXCHANGE_NAME = "logs"; 15 16 public static void main(String[] args) throws IOException, ShutdownSignalException, 17 ConsumerCancelledException, InterruptedException { 18 19 ConnectionFactory factory = new ConnectionFactory(); 20 21 factory.setHost("121.40.151.120"); 22 factory.setUsername("rabbitmqtest"); 23 factory.setPassword("rabbitmqtest"); 24 25 Connection connection = factory.newConnection(); 26 Channel channel = connection.createChannel(); 27 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 28 29 // 临时队列(temporary queue) 30 // 首先,无论什么时候连接Rabbit时,我们需要一个新的、空的队列 31 // First, whenever we connect to Rabbit we need a fresh, empty queue. 32 // 为了做到这一点,我们可以创建一个随机命名的队列,或者更好的,就让服务端给我们选择一个随机的队列名字。 33 // 其次,一旦我们关闭消费者的连接,这个临时队列应该自动销毁。 34 String queueName = channel.queueDeclare().getQueue(); 35 // routingKey这里是没有任何作用的,也就是名字可以随便取 36 channel.queueBind(queueName, EXCHANGE_NAME, "anyRoutingKey2"); 37 38 System.out.println("CRTL+C"); 39 40 // QueueingConsumer:用来缓存服务端推送给我们的消息。 41 QueueingConsumer consumer = new QueueingConsumer(channel); 42 channel.basicConsume(queueName, true, consumer); 43 44 while (true) { 45 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 46 String message = new String(delivery.getBody()); 47 System.out.println("[" + message + "]"); 48 } 49 } 50 51 }
三种Exchange的图解:
direct exchange:
fanout exchange:
topic exchange:
转载于:https://www.cnblogs.com/weishaohua/p/4042997.html
RabbitMQ系列之三:publish subscribe相关推荐
- RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 : 1.Work queues 工作队列 2.Publish/Subscribe 发布订阅 3.Routing 路由 4.Topics 通 ...
- RabbitMQ学习之Publish/Subscribe(3)
上一个教程中,我们创建了一个work queue. 其中的每个task都会被精确的传送到一个worker. 这节,我们将会讲把一个message传送到多个consumers. 这种模式叫做publis ...
- RabbitMq 发布订阅 Publish/Subscribe fanout/direct
目录 概述 交换机 临时队列 代码 概述 在上篇中了解到rabbitmq 生产者生产消息到队列,多个消费者可以接受.这篇文章主要记录广播类型为fanout.生产者不在将产生的消息发送到队列,而是将消息 ...
- RabbitMQ Tutorials 3 - Publish/Subscribe 发布/订阅
发布/订阅 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日 ...
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)
在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...
- RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)
1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...
- 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅
在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...
- RabbitMQ发布/订阅模式(Publish/Subscribe)
工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...
最新文章
- Dubbo基础专题——第二章(Dubbo工程简单实践)
- 「SAP技术」SAP MM 事务代码ME17的用法
- 企业Shell实战-MySQL分库分表备份脚本
- python列表做参数传值_python不定参数传值怎么做-问答-阿里云开发者社区-阿里云...
- 【NLP】Pytorch中文语言模型bert预训练代码
- 【今日CV 计算机视觉论文速览 第119期】Wed, 22 May 2019
- sring-list-del-string-int:解析左右编码器的,和#号
- IDEA 2020 配置 Maven 创建 Spring Boot 项目
- 5分钟商学院-个人篇-高效能人士的思维习惯
- 按揭月供计算器(等额本息)
- 飞鱼服务器 微信,飞鱼微信客服系统软件
- python三阶魔方_三阶魔方还原公式
- 如何打造VUCA时代的敏捷型组织?
- POI对word文档中的指定内容添加批注
- android逆向知乎,Android逆向之路---为什么从后台切换回app又显示广告了
- jedis异常:Could not get a resource from the pool
- Vue-axios使用QS(QueryString)插件,Vue-axios无法发送参数给后端(包含但不限于php)。
- STM32三条总线(AHB、APB1、APB2)的外设映射情况
- 关闭windows系统ssh连接linux终端发出提示音
- 地球坐标系与投影方式
热门文章
- 网络营销中SEO是最常用的“圈粉”引流方式之一
- 网站内容页面如何优化才利于排名提升?
- python constructor_python – 无法成功启动boa-constructor
- win系统加入方舟服务器秒退,win10玩方舟闪退 | 手游网游页游攻略大全
- 反转 鼠标_梦幻西游:剧情反转?挖图挖出环装,在晶清加持下3000W到手
- mac图标包_Mac 生产力配置手册,从 Homebrew 说开去
- 混凝土地坪机器人_创新引领 快速建造丨临时设施大项目部使用机器人等五项技术刷新建设新效率...
- python读取excel数据并饼图_python生成excel表格以及饼图 示例源码
- 信令风暴问题根因分析
- 木马——本质就是cs socket远程控制,反弹木马是作为c端向外发起网络请求