RabbitMQ 工作模式二
之前写了WORKQUEUES 跟 Publish/Subscribe 俩种模式 ,RabbitMQ 工作模式一
Routing 工作模式
特点
每个消费者监听自己的队列,并且设置routingkey
生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列
说白了,就是在publish/subscribe 工作模式的基础上加一层筛选,判断
if(队列.routingkey == 生产者.routingkey) 发送消息给duilie
下面是我写的生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer03 {//队列名称private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";public final static String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建一个通道/*** 定义交换机* param1 : 交换机名称* param2 : 交换机类型**/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);/*** 定义消息队列** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_INFORM_TEST1,true ,false ,false ,null);channel.queueDeclare(QUEUE_INFORM_TEST2,true ,false ,false ,null);//绑定交换机跟队列/*String queue, String exchange, String routingKey*/channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);String message = "";//给队列发送消息for (int i = 0; i < 9; i++) {//
// String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "人间有百媚千红,唯你是我情之所钟。第"+i+"条消息";//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST1 ,null ,message.getBytes() );}for (int i = 0; i < 5; i++) {//
// String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "你别回头看我了,走吧,山高水长,可别再碰到我这么喜欢你的人了。第"+i+"条消息";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST2 ,null ,message.getBytes() );}channel.close();connection.close();}}
设置了交换机名称,及路由的交换机类型
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
--------------------------------------------------------------------------------------------------------------------------------------
声明了俩个队列...分别为 test1, test2
*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments** param1: 队列名称* param2: 是否持久化* param3 : 是否独占此队列* param4 : 队列不用是否自动删除* param5 : 参数*/
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );
------------------------------------------------------------------------------------------------------------
队列和交换机绑定
//交换机和队列绑定 /*** String queue, String exchange, String routingKey* param1 : 队列名称* exchange : 交换机* routingKey : 给队列添加一个 路由key,交换机发送消息时根据填写的路由key 来判断,如果填写的key 跟 队列的路由key 相同,那么就会发送消息给此队列 */
channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);
-------------------------------------------------------------------------
发送消息
/*** String exchange, String routingKey, BasicProperties props, byte[] body** param1 交换机名称* param2 根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* param3 参数* param4 传递的字符串***/
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());
``````````````````````````````````````````````````````````````````````````````````````````````````````````````
以上是生产者的代码 ,下面提供消费者的代码
import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest1 {//队列名称private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();// 创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {// 重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};// 监听队列/*** String queue, boolean autoAck, Consumer callback** param1 : 队列名称* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_TEST1,true,consumer);}catch (Exception e){}}
}
消费者二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest2 {private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();// 创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {// 重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};// 监听队列/*** String queue, boolean autoAck, Consumer callback** param1 : 队列名称* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);}catch (Exception e){}}
}
消费者的代码其实很简单,就是监听而已.只需要指定一下监听的队列就行并提供一个 执行的方法 ,就是下面这句
channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);
测试
先启动producer,否则会报错,因为consumer 中没有声明队列,并且没有中rabbitmq中发现队列,就会抛出异常
生产者 会给 test1 发送 9条消息 给test 发送 5条消息
然后我们依次启动俩个消费者
然后看处理结果
Topics 工作模式
通配符工作模式
每个消费者监听自己的队列,并且设置带统配符的routingkey
生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
统配符规则:
符号#可以匹配多个词,
符号*可以匹配一个词语。
因为通配符感觉比较难讲,所以我在网上找了一个充值的案例
场景
用户充值完成, email 的用户 接收email的提示, sms的用户接收sms的提示.. 设置两种通知类型都接收的则两种通知都有效。
大致思路
设置 路由匹配规则
邮件的匹配规则 "inform.#.email.#" sms的匹配规则 "inform.#.sms.#"
给只接收email的用户 发送消息 路由设置 inform.email
给只接收 sm的用户 发送消息 路由设置 inform.sms
给 都接收的用户 发送消息 inform.sms.email 不懂的话 看一下匹配规则,,有点取巧来着
下面是我写的生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer03 {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";public final static String EXCHANGE_ROUTING_INFORM = "exchange_topic_inform";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建一个通道/*** 定义交换机* param1 : 交换机名称* param2 : 交换机类型**/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);/*** 定义消息队列** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_INFORM_EMAIL,true ,false ,false ,null);channel.queueDeclare(QUEUE_INFORM_SMS,true ,false ,false ,null);//绑定交换机跟队列/*String queue, String exchange, String routingKey*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");String message = "";//给接收邮件的人发邮件for (int i = 0; i < 9; i++) {//
// String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "记忆最让人崩溃的地方,也许就在于它的猝不及防在某个祥和的午后,你正吃着火锅唱着歌,那些尖利的记忆碎片就像潮水突然涌进你到脑海里,让你闪躲不及。";//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );}//给接收短信的人发短信for (int i = 0; i < 5; i++) {//
// String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "发送了一百条短信,九十九条都是你";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );}// 给都接收的人发送 for (int i = 0; i < 3; i++) {message = "我回你是秒回,你回我是轮回";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes() );}channel.close();connection.close();}}
-----------------------
定义交换机,并设置交换机的类型为通配符模式
/*** 定义交换机* param1 : 交换机名称* param2 : 交换机类型**/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);
---------------------------------------------------
绑定交换机跟队列,并配置队列的路由通配符规则
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");
--------------------------------------------------------------------------------------------
发送消息给broker 并发送路由key
/*** 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchang
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
* 3、消息属性
* 4、消息内容*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes());
消息监听者1
import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest1 {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();// 创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {// 重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};// 监听队列/*** String queue, boolean autoAck, Consumer callback** param1 : 队列名称* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);}catch (Exception e){}}
}
消息监听二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest2 {private static final String QUEUE_INFORM_SMS= "queue_inform_sms";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest"); //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();// 创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {// 重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};// 监听队列/*** String queue, boolean autoAck, Consumer callback** param1 : 队列名称* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_SMS,true , consumer);}catch (Exception e){}}
}
其实监听者代码都不要变来着,都是通用的
测试
先启动生产者,后启动监听者.....理由是 如果先启动监听者,队列没创建,就会报错
测试结果
sms的打印台打印结果
email 打印台打印结果
其实搞懂了路由模式后这个就很容易理解这个....
Header模式
header 模式其实跟路由模式很像,他们不同的是header模式取消routingkey,使用header中的 key/value(键值对)匹配队列
了解了很多不同的模式,其实你会发现,代码很多都是相同的....这里我就不粘贴全部代码,就只给不同的代码了
绑定交换机跟队列的代码
HashMap<String, Object> header_email = new HashMap<>();header_email.put("inform_type", "cms");HashMap<String, Object> header_sms = new HashMap<>();header_sms.put("inform_type", "sms");channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"",header_email);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"",header_sms);
发送消息的代码
message = "记忆最让人崩溃的地方,也许就在于它的猝不及防在某个祥和的午后,你正吃着火锅唱着歌,那些尖利的记忆碎片就像潮水突然涌进你到脑海里,让你闪躲不及。";HashMap<String, Object> header = new HashMap<>();header.put("inform_type", "sms"); //匹配sms通知消费者绑定的headerAMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();properties.headers(header)//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"" ,properties.build() ,message.getBytes() );
消费者都是一样的,改一下队列名称就行
RabbitMQ 工作模式二相关推荐
- RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing
第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...
- 3.RabbitMQ工作模式介绍
3.RabbitMQ工作模式介绍.md 文章目录 3.RabbitMQ工作模式介绍.md 1.简单模式 1.1总结 2.Work Queues 工作队列模式 2.1 模式说明 2.2 代码编写 2.3 ...
- 学成在线--13.RabbitMQ工作模式
文章目录 一.Work queues 二.Publish/subscribe 1.工作模式 2.代码 1)生产者 2)消费者 3.测试 4.思考 三.Routing 1.工作模式 2.代码 1)生产者 ...
- RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 : 1.Work queues 工作队列 2.Publish/Subscribe 发布订阅 3.Routing 路由 4.Topics 通 ...
- RabbitMQ工作模式
RabbitMQ总共有六种工作模式: 1.工作队列模式 work queues 2.发布/订阅模式 Publish/Subscribe 3.路由模式 Routing 4.通配符模式 Topics 5. ...
- rabbitMq工作模式特性及整合springboot
因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq mq(message queue)消息队列 官网:www.rabbitmq.com 使用消息队列的优点:1. ...
- 消息队列(四)——rabbitMQ四种工作模式
RabbitMQ工作模式 模式总结: 1.简单模式helloworld 一个生产者,一个消费者,不需要交换机(使用默认交换机) 2.工作队列模式Work Queue 一个生产者,多个消费者(竞争关系) ...
- RabbitMQ的交换机类型和工作模式
RabbitMQ的交换机类型有四种 1.direct 直流交换机: 根据消息的路由键routingkey,将消息以完全匹配的方式路由到指定的队列中. 这里的匹配指的是消息本身携带的路由键和队列与交换机 ...
- RabbitMQ的工作模式
RabbitMQ工作模式 Work queues Publish/Subscribe Routing Topics Header RPC Work queues work queues工作模式是一个服 ...
- RabbitMQ的工作模式和使用场景
RabbitMQ工作模式 1.基本模型 2.RabbitMQ应用场景 MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Me ...
最新文章
- mysql 缓存区_Mysql缓存的配置和使用
- 揭秘|多伦多大学反人脸识别,身份欺骗成功率达99.5%
- setInterval 与 clearInterval详解
- 文件怎么上传远程服务器,怎么上传文件到远程服务器
- 怎么可以用计算机弄出表白数字,怎么用数字表白 表白爱情数字大全
- 你应该知道的Windows复制技术
- 计算机基础知识ppt图文,计算机基础知识讲解ppt课件.ppt
- 关于『区位码』、『国标码』、『机内码』的转换问题
- 计算机软件故障实验报告,湖大选修实验报告计算机软硬件一般故障的排除.doc...
- 游戏圈子--创业股份没拿到,你该怨谁?
- Yolo 一小时学会基本操作
- 编辑为什么建议转投_编辑回信解读 —“建议转投子刊”实例
- Win10下C:\Users\***修改用户名(完全修改)
- Web前端——JSP
- 在几何画板中如何制作圆柱的侧面展开动画_如何用几何画板做三棱柱的侧面展开动画...
- Exception in Thread: ValueError: signal number 32 out of range
- APP+springboot订餐APP 毕业设计源码190711
- 欧拉回路(简单判断是否有欧拉回路存在)
- 电感式DC/DC升压原理
- 希望传说手游如何在电脑上玩 希望传说手游模拟器教程