1、什么是路由模式(direct)

  路由模式是在使用交换机的同时,生产者指定路由发送数据,消费者绑定路由接受数据。与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的那个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据。下图取自于官方网站(RabbitMQ)的路由模式的图例

P:消息的生产者

X:交换机

红色:队列

C1,C2:消息消费者

error,info,warning:路由

  举个日志处理例子:系统需要针对日志做分析,首先所有的日志级别的日志都需要保存,其次error日志级别的日志需要单独做处理。这时就可以使用路由模式来处理了,声明交换机使用路由模式,每个日志级别的日志对应一个路由(error,info,warning)。声明一个保存日志队列用于接受所有日志,绑定交换机并绑定所有路由。声明第二个队列用于处理error级别日志,绑定交换机且只绑定error路由。以下是代码讲解。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

2、生产者(Send)代码

public class Send
{//交换机名称private final static String EXCHANGE_NAME       = "test_exchange_direct";//路由名称warningprivate final static String ROUTING_KEY_WARNING = "warning";//路由名称infoprivate final static String ROUTING_KEY_INFO    = "info";//路由名称errorprivate final static String ROUTING_KEY_ERROR   = "error";public static void main(String[] args){try{//获取连接Connection connection = ConnectionUtil.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");String message = "this is warning log";//发送消息(warning级别日志)channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));System.out.println("[send]:" + message);//发送消息(info级别日志)message = "this is info log";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));System.out.println("[send]:" + message);//发送消息(error级别日志)message = "this is error log";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));System.out.println("[send]:" + message);channel.close();connection.close();}catch (IOException | TimeoutException e){e.printStackTrace();}}
}运行结果:

[send]:this is warning log
[send]:this is info log
[send]:this is error log

3、消费者1(ReceiveAllLog)

public class ReceiveAllLog
{//交换机名称private final static String EXCHANGE_NAME       = "test_exchange_direct";//路由名称warningprivate final static String ROUTING_KEY_WARNING = "warning";//路由名称infoprivate final static String ROUTING_KEY_INFO    = "info";//路由名称errorprivate final static String ROUTING_KEY_ERROR   = "error";//队列名称private static final String QUEUE_NAME          = "test_queue_save_all_log";public static void main(String[] args){try{//获取连接Connection connection = ConnectionUtil.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机(指定路由info,error,warning)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);//保证一次只分发一个  int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel){//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{String message = new String(body, "utf-8");System.out.println("[test_queue_save_all_log] Receive message:" + message);try{//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}finally{//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);}catch (IOException e){e.printStackTrace();}}
}运行结果:

[test_queue_save_all_log] Receive message:this is warning log
[test_queue_save_all_log] Receive message:this is info log
[test_queue_save_all_log] Receive message:this is error log

4、消费者2(ReceiveErrorLog)

public class ReceiveErrorLog
{//交换机名称private final static String EXCHANGE_NAME     = "test_exchange_direct";//路由名称errorprivate final static String ROUTING_KEY_ERROR = "error";//队列名称private static final String QUEUE_NAME        = "test_queue_handel_error";public static void main(String[] args){try{//获取连接Connection connection = ConnectionUtil.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机(指定路由error)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);//保证一次只分发一个  int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel){//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{String message = new String(body, "utf-8");System.out.println("[test_queue_handel_error] Receive message:" + message);try{//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}finally{//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);}catch (IOException e){e.printStackTrace();}}
}运行结果:
[test_queue_handel_error] Receive message:this is error log

总结:

  1.两个队列消费者设置的路由不一样,接收到的消息就不一样。路由模式下,决定消息向队列推送的主要取决于路由,而不是交换机了。

  2.该模式必须设置交换机,且声明路由模式:channel.exchangeDeclare(EXCHANGE_NAME, "direct");

RabbitMQ路由模式(direct)相关推荐

  1. 路由模式 - direct

    2019独角兽企业重金招聘Python工程师标准>>> package com.shi.rout;import java.io.IOException; import java.ut ...

  2. RabbitMQ的Routing 路由模式(Direct)

    RabbitMQ的Routing 路由模式 模式说明: 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key) 消息的发送方在向 Exchange 发送消息时,也必须 ...

  3. RabbitMQ路由模式

    路由模式 一个生产者,发送消息 每个消费者,都有一个独立的队列 消息发送到交换机,交换机发送到每个队列 根据key,是否相等,来接收消息 Send 生产者 package cn.itcast.rabb ...

  4. 【夏目鬼鬼分享】RabbitMQ路由模式

    路由模式 路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者.两个消费者.两个队列和一个交换机.两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发 ...

  5. RabbitMQ——路由模式

    下面我们将要实现这个模型,所有的代码将都是以这个模型为基础: direct: 首先,我们设置的routingKey是 error,那么按照路由规则,我们最终将向这两个队列发送消息: 生产者: publ ...

  6. (需求实战_进阶_03)SSM集成RabbitMQ 路由模式关键代码讲解、开发、测试

    接上一篇:(企业内部需求实战_进阶_02)SSM集成RabbitMQ 关键代码讲解.开发.测试 https://gblfy.blog.csdn.net/article/details/10421403 ...

  7. 【转】RabbitMQ六种队列模式-4.路由模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 [本文] RabbitMQ六种队列 ...

  8. RabbitMQ(六) Routing路由模式

    概述 所谓RabbitMq中路由模式(Routing)为我们在将发送消息队列以及接收消息队列(queue)绑定到交换机(exchange)时指定了一个RoutingKey.然后我们在通过连接信道向交换 ...

  9. 【RabbitMQ】基础四:路由模式(Routing)

    [RabbitMQ]基础四:路由模式(Routing) 1. 路由模式说明 2. 代码示例 2.1 生产者 2.2 消费者1 2.3 消费者2 2.4 测试 3. 总结 1. 路由模式说明 路由模式特 ...

最新文章

  1. GPU 编程入门到精通(一)之 CUDA 环境安装
  2. parallels desktop虚拟机与Mac共享网络设置方法
  3. Echarts在手机端y轴数据过大,显示不全
  4. 鸿蒙系统怎么换windows,求助~鸿蒙系统windows环境搭建(hpm-cli安装失败)!
  5. 关于js的一些常用小知识点(持续更新)
  6. 使用IntelliJ IDEA开发SpringMVC网站(五)博客文章管理
  7. android刷机方法,介绍一种android的裸刷机方法(fastboot刷机实质)
  8. matlab虚拟现实之在V-Realm Builder2中建立父子关系
  9. 第05课 Linux命令初探(一)
  10. 【Hibernate步步为营】--核心对象+持久对象全析(二)
  11. Linux简介,虚拟机,远程操作工具安装及基本使用
  12. 爬取天天基金排行榜上的基金信息
  13. 苹果邮箱登录入口_电子邮箱的申请及使用说明
  14. 人生经验:热闹还是要看?
  15. 数据可视化之matplotlib实战:plt.pie() 绘制内嵌环形饼图
  16. 四年开发,待业半年本想放弃Java,抱着试试的心态面试某C轮金融科技公司居然过了!
  17. “学习金字塔”的真与假
  18. [UNR#5]诡异操作
  19. Java实现的小根堆
  20. 卷积神经网络——卷积层、池化层意义

热门文章

  1. ZooKeeper(四)ZooKeeper的简单使用
  2. Python2读取Excel文件时候文件名称是中文的时候处理
  3. c语言 一个矩阵的乘积,c语言矩阵相乘
  4. 品质主管每日工作需要做哪些_游戏配音需要做哪些工作?
  5. python如何运行代码_python上怎么跑(运行)代码
  6. linux 使用rpm卸载软件的使用方法
  7. 微软官网真的是一个神奇的地方,高清壁纸,直接下载
  8. java对象转excel_Java对象和Excel转换工具XXL-EXCEL
  9. CXF 生成Web Service Client(将WSDl 转化成 Java代码)
  10. 在浏览器中实现复制内容到剪切板中