在fanout模型中,一条消息会被所有订阅的队列消费,即绑定了对应交换机的消费者,都能收到消息。但在某些场景下,我们希望不同的消息发送到不同的队列,被不同的消费者消费,此时就要用到Direct类型的交换机。比如日志分为warn、info、error等多个类型,在错误日志中,只需要看到error类型的日志,在所有日志中,多个类型的日志都需要被记录

在官方文档可以看到,我们给发送的消息一个绑定键,即秘钥,队列在接收消息时需要匹配对应的绑定建,符合自己规则的消息才会接收。

  1. 生产者
    首先,我们要选择direct类型的交换机,此时,我的/vh虚拟主机默认提供了一个名为amq.direct的direct类型的交换机,我们不妨重新声明一个amqp.direct的direct类型的交换机,与之前例子的交换机相呼应。
public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();// 定义通道对应的交换机 参数一:交换机名称 参数二:类型 directchannel.exchangeDeclare("amqp.direct","direct");String routingKey = "info";// 发送消息channel.basicPublish("amqp.direct",routingKey,null,("direct message,routingKey为:" + routingKey + "," + System.currentTimeMillis()).getBytes());}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}
}

在执行生产者后,因为原交换机列表中不存在代码中的交换机,rabbitmq会自动创建一个对应的交换机。

  1. 消费者

消费者一:

public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare("amqp.direct","direct");//创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定交换机和队列channel.queueBind(queueName,"amqp.direct","info");channel.queueBind(queueName,"amqp.direct","warn");channel.queueBind(queueName,"amqp.direct","error");// 每次只能消费一个消息channel.basicQos(1);// 消费消息channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));//参数一:确认队列中的那个消息  参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}
}

消费者二:

public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare("amqp.direct","direct");//创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定交换机和队列channel.queueBind(queueName,"amqp.direct","error");// 每次只能消费一个消息channel.basicQos(1);// 消费消息channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));//参数一:确认队列中的那个消息  参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}

在消费者一中绑定了info、warn、error三种routingKey,在消费者二中绑定了error一种routingKey

  1. 测试
    启动消费者一和消费者二,再启动生产者,发送一条info类型的消息。

此时:
消费者一

消费者二
可见,消费者一接收到了消息,而消费者二并没有接收到
我们再发送一条error类型的消息。

此时:
消费者一
消费者二
消费者一和二都接受到了这条routingKey为error的消息,以此,可以根据routingKey来控制哪些消息发送给哪些队列。

java实现rabbitmq路由模型(routing/topic queues), 生产者 消费者 交换机 消息队列相关推荐

  1. java实现rabbitmq动态路由/话题模型(topic queues), 生产者 消费者 交换机 消息队列

    在routing路由模型中,我们实现了可以根据routingKey来选择性地将消息发送到对应的消息队列中,但是,这种模型不够灵活,比如最开始只有warn.info.error.三种类型的日志,但后面如 ...

  2. java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列

    发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...

  3. RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)

    在上一章中,我们构建了一个简单的日志系统,我们可以把消息广播给很多的消费者.在本章中我们将增加一个特性:我们可以订阅这些信息中的一些信息.例如,我们希望只将error级别的错误存储到硬盘中,同时可以将 ...

  4. Java多线程(十):BlockingQueue实现生产者消费者模型

    BlockingQueue BlockingQueue.解决了多线程中,如何高效安全"传输"数据的问题.程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程. 方法介绍 ...

  5. java生产消费模型代码实现_生产者-消费者模型的Java实现

    本文转自:http://tanlan.iteye.com/blog/1158154 生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个 ...

  6. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  7. Java 线程实例二(终止线程、生产者/消费者问题、获取线程状态、获取所有线程、查看线程优先级、中断线程)

    终止线程 Java中原来在Thread中提供了stop()方法来终止线程,但这个方法是不安全的,所以一般不建议使用. 本文向大家介绍使用interrupt方法中断线程. 使用interrupt方法来终 ...

  8. Java多线程---线程通信(wait,notifyAll,生产者消费者经典范式,owner wait set,自定义显式锁BooleanLock)

    转自:https://blog.csdn.net/qq_35995514/article/details/91128585 1 学习内容 notifyAll 生产者.消费者经典范式 线程休息室 wai ...

  9. Java并发编程(二十三)------并发设计模式之生产者消费者模式

    参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...

最新文章

  1. displaysettings.java_Android设置系统开机自动永不休眠
  2. Javascript 方法大全
  3. DCMTK:类DcmUnsigned64bitVeryLong的测试程序
  4. getter和setter
  5. 保存 laravel model 而不更新 timestamps 的方法
  6. PL/SQL Step By Step(三)
  7. darknet框架_【杂谈】面向新手的深度学习开源框架指导手册与GitHub项目,欢迎加入我们的开源团队...
  8. 分享16款Java小游戏源码Java applet小游戏源码
  9. 第18章 人口普查
  10. Redis和MySQL保持数据一致性
  11. Elasticsearch(006):es中filtered和filter的区别
  12. 生死看淡,不服就GAN(五)----用DCGAN生成MNIST手写体
  13. Java游戏项目开发 王者荣耀 学会你就是最强王者
  14. wgs84转百度坐标系
  15. 【HTML】音视频标签(audio、video、embed)
  16. python爬虫详解(三)——爬取世界常用密码并保存到字典内
  17. 上传图片就能建模?!一个人人可用的在线三维大场景重建云平台
  18. Binwalk的安装和使用
  19. 一触即达!佰达慧兴携新风向标开展新零售电商产业升级
  20. MAGIX Sequoia 15 v15.5 Windows 高级母带广播音频制作软件

热门文章

  1. .NetCore实践篇:成功解决分布式监控ZipKin聚合依赖问题(三)
  2. 模拟存储器管理C语言,操作系统-存储器管理实验C语言.doc
  3. Linux安装,虚拟机VMware-workstation安装CentOS操作系统的安装手册
  4. ftp主动和被动模式_ftp协议,深入理解ftp协议只需3步
  5. qt通过http连接mysql_Qt如何利用MySQL连接远程数据库?
  6. 洛谷——P1424 小鱼的航程(改进版)
  7. Unity3D-VR_Gevr VR射线+tag的转换+物体展示
  8. debian之快速截图
  9. 机器视觉——计算视野的小工具
  10. 看上90亿的当当,海航的眼光是极好的