之前写了WORKQUEUES 跟   Publish/Subscribe 俩种模式 ,RabbitMQ 工作模式一

Routing 工作模式

特点

每个消费者监听自己的队列,并且设置routingkey

生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列

说白了,就是在publish/subscribe 工作模式的基础上加一层筛选,判断

 if(队列.routingkey == 生产者.routingkey)    发送消息给duilie

下面是我写的生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer03 {//队列名称private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";public final static String  EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建一个通道/*** 定义交换机*  param1 :  交换机名称*  param2 :   交换机类型**/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);/***    定义消息队列**    String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_INFORM_TEST1,true ,false ,false ,null);channel.queueDeclare(QUEUE_INFORM_TEST2,true ,false ,false ,null);//绑定交换机跟队列/*String queue, String exchange, String routingKey*/channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);String message = "";//给队列发送消息for (int i = 0; i < 9; i++) {//
//            String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "人间有百媚千红,唯你是我情之所钟。第"+i+"条消息";//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST1 ,null ,message.getBytes() );}for (int i = 0; i < 5; i++) {//
//            String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "你别回头看我了,走吧,山高水长,可别再碰到我这么喜欢你的人了。第"+i+"条消息";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST2 ,null ,message.getBytes() );}channel.close();connection.close();}}

设置了交换机名称,及路由的交换机类型

channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

--------------------------------------------------------------------------------------------------------------------------------------

声明了俩个队列...分别为 test1, test2

*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments** param1: 队列名称* param2: 是否持久化* param3 : 是否独占此队列* param4 : 队列不用是否自动删除*  param5 : 参数*/
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );

------------------------------------------------------------------------------------------------------------

队列和交换机绑定

//交换机和队列绑定
/*** String queue, String exchange, String routingKey* param1 :  队列名称* exchange :  交换机* routingKey : 给队列添加一个 路由key,交换机发送消息时根据填写的路由key 来判断,如果填写的key 跟 队列的路由key 相同,那么就会发送消息给此队列 */
  channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);

-------------------------------------------------------------------------

发送消息

/*** String exchange, String routingKey, BasicProperties props, byte[] body**   param1  交换机名称*   param2  根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列*   param3  参数*   param4 传递的字符串***/
 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());

``````````````````````````````````````````````````````````````````````````````````````````````````````````````

以上是生产者的代码 ,下面提供消费者的代码

import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest1 {//队列名称private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();//            创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {//                重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};//            监听队列/*** String queue, boolean autoAck, Consumer callback** param1 :  队列名称* param2 :   是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_TEST1,true,consumer);}catch (Exception e){}}
}

消费者二

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest2 {private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();//            创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {//                重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};//            监听队列/*** String queue, boolean autoAck, Consumer callback** param1 :  队列名称* param2 :   是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);}catch (Exception e){}}
}

消费者的代码其实很简单,就是监听而已.只需要指定一下监听的队列就行并提供一个 执行的方法 ,就是下面这句

channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);

测试

先启动producer,否则会报错,因为consumer 中没有声明队列,并且没有中rabbitmq中发现队列,就会抛出异常

生产者 会给 test1 发送  9条消息   给test 发送 5条消息

然后我们依次启动俩个消费者

然后看处理结果

Topics 工作模式

通配符工作模式

每个消费者监听自己的队列,并且设置带统配符的routingkey

生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

统配符规则:

符号#可以匹配多个词,

符号*可以匹配一个词语。

因为通配符感觉比较难讲,所以我在网上找了一个充值的案例

场景

用户充值完成, email 的用户 接收email的提示, sms的用户接收sms的提示.. 设置两种通知类型都接收的则两种通知都有效。

大致思路

设置 路由匹配规则

邮件的匹配规则   "inform.#.email.#"          sms的匹配规则   "inform.#.sms.#"

给只接收email的用户 发送消息  路由设置     inform.email

给只接收 sm的用户  发送消息 路由设置   inform.sms

给 都接收的用户 发送消息                 inform.sms.email         不懂的话 看一下匹配规则,,有点取巧来着

下面是我写的生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer03 {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";public final static String  EXCHANGE_ROUTING_INFORM = "exchange_topic_inform";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建一个通道/*** 定义交换机*  param1 :  交换机名称*  param2 :   交换机类型**/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);/***    定义消息队列**    String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_INFORM_EMAIL,true ,false ,false ,null);channel.queueDeclare(QUEUE_INFORM_SMS,true ,false ,false ,null);//绑定交换机跟队列/*String queue, String exchange, String routingKey*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");String message = "";//给接收邮件的人发邮件for (int i = 0; i < 9; i++) {//
//            String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "记忆最让人崩溃的地方,也许就在于它的猝不及防在某个祥和的午后,你正吃着火锅唱着歌,那些尖利的记忆碎片就像潮水突然涌进你到脑海里,让你闪躲不及。";//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );}//给接收短信的人发短信for (int i = 0; i < 5; i++) {//
//            String exchange, String routingKey, BasicProperties props, byte[] bodymessage = "发送了一百条短信,九十九条都是你";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );}//        给都接收的人发送 for (int i = 0; i < 3; i++) {message = "我回你是秒回,你回我是轮回";//给test2 发送5条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes() );}channel.close();connection.close();}}

-----------------------

定义交换机,并设置交换机的类型为通配符模式

/*** 定义交换机*  param1 :  交换机名称*  param2 :   交换机类型**/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);

---------------------------------------------------

绑定交换机跟队列,并配置队列的路由通配符规则

  channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");

--------------------------------------------------------------------------------------------

发送消息给broker 并发送路由key

/*** 参数明细

* 1、交换机名称,不指令使用默认交换机名称 Default Exchang

* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列

* 3、消息属性

* 4、消息内容*/

 channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes());

消息监听者1

import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest1 {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();//            创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {//                重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};//            监听队列/*** String queue, boolean autoAck, Consumer callback** param1 :  队列名称* param2 :   是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);}catch (Exception e){}}
}

消息监听二

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerTest2 {private static final String QUEUE_INFORM_SMS= "queue_inform_sms";public static void main(String[] args){Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("localhost");//设置端口factory.setPort(5672);//设置账号密码factory.setUsername("guest");  //默认账号密码都是guestfactory.setPassword("guest");//设置虚拟空间factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话channel = connection.createChannel();//            创建默认消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {//                重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("receive message.."+new String(body,"utf-8"));}};//            监听队列/*** String queue, boolean autoAck, Consumer callback** param1 :  队列名称* param2 :   是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在* param3 : 消费对象,,包含消费方法**/channel.basicConsume(QUEUE_INFORM_SMS,true , consumer);}catch (Exception e){}}
}

其实监听者代码都不要变来着,都是通用的

测试

先启动生产者,后启动监听者.....理由是  如果先启动监听者,队列没创建,就会报错

测试结果

sms的打印台打印结果

email 打印台打印结果

其实搞懂了路由模式后这个就很容易理解这个....

Header模式

header 模式其实跟路由模式很像,他们不同的是header模式取消routingkey,使用header中的 key/value(键值对)匹配队列

了解了很多不同的模式,其实你会发现,代码很多都是相同的....这里我就不粘贴全部代码,就只给不同的代码了

绑定交换机跟队列的代码

  HashMap<String, Object> header_email = new HashMap<>();header_email.put("inform_type", "cms");HashMap<String, Object> header_sms = new HashMap<>();header_sms.put("inform_type", "sms");channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"",header_email);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"",header_sms);

发送消息的代码

message = "记忆最让人崩溃的地方,也许就在于它的猝不及防在某个祥和的午后,你正吃着火锅唱着歌,那些尖利的记忆碎片就像潮水突然涌进你到脑海里,让你闪躲不及。";HashMap<String, Object> header = new HashMap<>();header.put("inform_type", "sms");  //匹配sms通知消费者绑定的headerAMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();properties.headers(header)//给test1 发送9条消息channel.basicPublish(EXCHANGE_ROUTING_INFORM,"" ,properties.build() ,message.getBytes() );

消费者都是一样的,改一下队列名称就行

RabbitMQ 工作模式二相关推荐

  1. RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing

    第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...

  2. 3.RabbitMQ工作模式介绍

    3.RabbitMQ工作模式介绍.md 文章目录 3.RabbitMQ工作模式介绍.md 1.简单模式 1.1总结 2.Work Queues 工作队列模式 2.1 模式说明 2.2 代码编写 2.3 ...

  3. 学成在线--13.RabbitMQ工作模式

    文章目录 一.Work queues 二.Publish/subscribe 1.工作模式 2.代码 1)生产者 2)消费者 3.测试 4.思考 三.Routing 1.工作模式 2.代码 1)生产者 ...

  4. RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

  5. RabbitMQ工作模式

    RabbitMQ总共有六种工作模式: 1.工作队列模式 work queues 2.发布/订阅模式 Publish/Subscribe 3.路由模式 Routing 4.通配符模式 Topics 5. ...

  6. rabbitMq工作模式特性及整合springboot

    因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq mq(message queue)消息队列 官网:www.rabbitmq.com 使用消息队列的优点:1. ...

  7. 消息队列(四)——rabbitMQ四种工作模式

    RabbitMQ工作模式 模式总结: 1.简单模式helloworld 一个生产者,一个消费者,不需要交换机(使用默认交换机) 2.工作队列模式Work Queue 一个生产者,多个消费者(竞争关系) ...

  8. RabbitMQ的交换机类型和工作模式

    RabbitMQ的交换机类型有四种 1.direct 直流交换机: 根据消息的路由键routingkey,将消息以完全匹配的方式路由到指定的队列中. 这里的匹配指的是消息本身携带的路由键和队列与交换机 ...

  9. RabbitMQ的工作模式

    RabbitMQ工作模式 Work queues Publish/Subscribe Routing Topics Header RPC Work queues work queues工作模式是一个服 ...

  10. RabbitMQ的工作模式和使用场景

    RabbitMQ工作模式 1.基本模型 2.RabbitMQ应用场景 MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Me ...

最新文章

  1. mysql 缓存区_Mysql缓存的配置和使用
  2. 揭秘|多伦多大学反人脸识别,身份欺骗成功率达99.5%
  3. setInterval 与 clearInterval详解
  4. 文件怎么上传远程服务器,怎么上传文件到远程服务器
  5. 怎么可以用计算机弄出表白数字,怎么用数字表白 表白爱情数字大全
  6. 你应该知道的Windows复制技术
  7. 计算机基础知识ppt图文,计算机基础知识讲解ppt课件.ppt
  8. 关于『区位码』、『国标码』、『机内码』的转换问题
  9. 计算机软件故障实验报告,湖大选修实验报告计算机软硬件一般故障的排除.doc...
  10. 游戏圈子--创业股份没拿到,你该怨谁?
  11. Yolo 一小时学会基本操作
  12. 编辑为什么建议转投_编辑回信解读 —“建议转投子刊”实例
  13. Win10下C:\Users\***修改用户名(完全修改)
  14. Web前端——JSP
  15. 在几何画板中如何制作圆柱的侧面展开动画_如何用几何画板做三棱柱的侧面展开动画...
  16. Exception in Thread: ValueError: signal number 32 out of range
  17. APP+springboot订餐APP 毕业设计源码190711
  18. 欧拉回路(简单判断是否有欧拉回路存在)
  19. 电感式DC/DC升压原理
  20. 希望传说手游如何在电脑上玩 希望传说手游模拟器教程

热门文章

  1. el-dialog 圆角 白边问题
  2. 南京毕业生租房补贴发票开具地点一览表
  3. 服务器vmware私有云,方案建议-使用VMware架构搭建自己的私有云.pptx
  4. Mac上Java环境变量配置_飘云羽逸_新浪博客
  5. rrcf算法的初步理解
  6. 一年太久,只争朝夕——2018年终总结
  7. 基础篇:6.9)形位公差-检测方法Measurement
  8. 六款国产杀毒软件资源占用测试,八款杀毒软件横向评测:系统资源占用篇
  9. MATLAB仿真任意带宽的窄带信号、宽带信号以及全频带信号
  10. 发布Flask项目到服务器