RabbitMQ(六) Routing路由模式
概述
所谓RabbitMq中路由模式(Routing)为我们在将发送消息队列以及接收消息队列(queue)绑定到交换机(exchange)时指定了一个RoutingKey。然后我们在通过连接信道向交换机发送消息时指定一个RoutingKey,交换机会将该消息发送到routingKey对应的接收队列上。在Routing模式中我们使用的交换机类型为direct。
代码示例:
生产者代码,主要步骤:
- 获取连接(工具类部分代码查阅RabbitMQ(三) HelloWorld )
- 创建信道
- 声明交换机指定名称以及类型
- 声明消息队列
- 将不同消息队列绑定到交换机上并指定相应的路由key
- 发送消息,并指定交换机以及路由key
package com.xiaohui.rabbitmq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {//交换机名称public static final String DIRECT_EXCHANGE = "direct_exchange";//队列1public static final String DIRECT_QUEUE_1 = "delete_direct_queue_1";//队列2public static final String DIRECT_QUEUE_2 = "update_direct_queue_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机/*** 参数一:交换机名称* 参数二:交换机类型:fanout \topic \direct*/channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** 声明队列* durabe:持久化的消息,mq重启后消息仍在。* exclusive :独占的,一个消息队列独占一个连接*/channel.queueDeclare(Producer.DIRECT_QUEUE_1,true,false,false,null);channel.queueDeclare(Producer.DIRECT_QUEUE_2,true,false,false,null);//队列绑定交换机channel.queueBind(Producer.DIRECT_QUEUE_1, Producer.DIRECT_EXCHANGE,"delete");channel.queueBind(Producer.DIRECT_QUEUE_2, Producer.DIRECT_EXCHANGE,"update");for (int i = 1; i < 6; i++) {String msg = "路由模式小兔子来啦........"+i+"---"+(i%2 == 0 ? "delete":"update");channel.basicPublish(Producer.DIRECT_EXCHANGE,(i%2 == 0 ? "delete":"update"),null,msg.getBytes());System.out.println("已发送消息:"+msg);}channel.close();connection.close();}
}
消费端代码如下,主要步骤:
- 创建连接
- 创建信道
- 声明交换机 指定名称(必须和生产者声明的同一交换机)以及类型
- 声明消费队列
- 绑定队列到交换机,并指明路由key
- 创建一个消费者,并实现监听回调处理。
- 启动指定名称的消费者,并使用6步骤中生成的消费对象启动。
package com.xiaohui.rabbitmq.routing;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);//声明消费队列channel.queueDeclare(Producer.DIRECT_QUEUE_1, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.DIRECT_QUEUE_1,Producer.DIRECT_EXCHANGE,"delete");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================路由消费者delete开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================路由消费者delete结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);}
}
消费者2代码:
package com.xiaohui.rabbitmq.routing;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);//声明消费队列channel.queueDeclare(Producer.DIRECT_QUEUE_2, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.DIRECT_QUEUE_2,Producer.DIRECT_EXCHANGE,"update");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================路由消费者update开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================路由消费者update结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.DIRECT_QUEUE_2,true,consumer);}
}
运行效果:
消费者1:
==================路由消费者delete开始===================
路由的key为:delete
交换机为:direct_exchange
消息ID为:1
收到的消息为:路由模式小兔子来啦........2---delete
===================路由消费者delete结束==================
==================路由消费者delete开始===================
路由的key为:delete
交换机为:direct_exchange
消息ID为:2
收到的消息为:路由模式小兔子来啦........4---delete
===================路由消费者delete结束==================
消费者2打印:
==================路由消费者update开始===================
路由的key为:update
交换机为:direct_exchange
消息ID为:1
收到的消息为:路由模式小兔子来啦........1---update
===================路由消费者update结束==================
==================路由消费者update开始===================
路由的key为:update
交换机为:direct_exchange
消息ID为:2
收到的消息为:路由模式小兔子来啦........3---update
===================路由消费者update结束==================
==================路由消费者update开始===================
路由的key为:update
交换机为:direct_exchange
消息ID为:3
收到的消息为:路由模式小兔子来啦........5---update
===================路由消费者update结束==================
RabbitMQ(六) Routing路由模式相关推荐
- RabbitMQ的Routing 路由模式(Direct)
RabbitMQ的Routing 路由模式 模式说明: 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key) 消息的发送方在向 Exchange 发送消息时,也必须 ...
- RabbitMQ学习系列(五):routing路由模式和Topic主题模式
(一)routing路由模式 在前面一篇博客中讲到了exchange的类型,其中direct类型的exchange就是用于routing路由模式.direct类型的交换机是指:交换机和队列绑定时会设置 ...
- Java笔记-使用RabbitMQ的Java接口实现Routing(路由模式)
目录 基本概念 代码与实例 基本概念 过程图如下: 主要是把交换机设置为直连的方式direct直连的方式然后把 数据 发送给交换机. 交换机再通过路由的Key值转发到队列上. 每一个客户端,都有一个队 ...
- RabbitMq | springboot (路由模式RoutingKey)整合Direct交换器
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息) 引入依赖: <dependency><gr ...
- RabbitMQ路由模式(direct)
1.什么是路由模式(direct) 路由模式是在使用交换机的同时,生产者指定路由发送数据,消费者绑定路由接受数据.与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机 ...
- rabbitmq几种工作模式_RabbitMQ的六种工作模式总结
精品推荐 国内稀缺优秀Java全栈课程-Vue+SpringBoot通讯录系统全新发布! 作者:侧身左睡 https://www.cnblogs.com/xyfer1018/p/11581511.ht ...
- Rabbitmq专题:springboot如何整合Rabbitmq?Rabbitmq有哪些工作模式?
文章目录 1. Rabbitmq的安装 2. Rabbitmq的基本概念 3. RabbitMQ的工作模式 3.1 "Hello World!" 简单模式 3.2 Work que ...
- RabbitMQ的Topics 通配符模式(Topic)
RabbitMQ的Topics 通配符模式(Topic) 模式说明 Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列.只不过 Topic 类型Exch ...
- RabbitMQ七种工作模式实现测试代码
所有工作模式依赖都相同 <dependencies><!--RabbitMQ的客户端依赖--><dependency><groupId>com.rabb ...
最新文章
- Windows 8实用窍门系列:10.Windows 8的基本变换和矩阵变换以及AppBar应用程序栏
- 字符串(0-9和小数点)转为数字--atof,数字转化为字符串--sprintf
- ML之XGBoost:XGBoost案例应用实战(原生接口实现+Scikit-learn接口实现)
- CentOS中配置Mysql表名忽略大小写以及提示:Caused by: org.quartz.impl.jdbcjobstore.LockException: Failure obtaining d
- 密码必须至少为6个字符_【每日一题】| 常见的编码方式之栅栏密码
- 梁迪:我为MVP骄傲,《微软最有价值专家奖励计划介绍》附专题视频
- 微信小程序定时器setInterval()的使用注意事项
- Django 组件- 中间件
- unity 2020 怎么写shader使其接受光照?_用Unity实现半条命Alyx中的液体物理效果
- 使用Prometheus+grafana打造高逼格监控平台
- java接口自动化书籍_java接口自动化优化(一)
- python selenium定位元素方法_[原创] python selenium 元素定位方法封装
- [Step By Step]SAP HANA PAL多元线性回归预测分析Linear Regression实例FORECASTWITHLR(预测)...
- Spring基于注解及SpringMVC
- python大数据和java大数据的区别-学习大数据先学Python还是JAVA?
- 如何使用Arduino开发板和ADXL345加速度计跟踪方向
- 设计模式:(生成器模式)
- python 随机生成6位数字+字母的密码
- 【vue】基于element UI周控件实现的单选周和多选周
- Linux下ln命令建立软硬链接