概述:

在上一文章中我们介绍了路由模式(Routing),routing模式是不同的消息队列绑定了不同的路由key,但是我们看出路由key为固定的字符串标记。而本章中的Topic模式则为在路由模式下,我们在绑定消息队列到交换机时指定的路由key为一个表达式,如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。该routingKey表达式必须使用点隔开的任意多个英文单词。另该交换机类型需要调整为topic类型。

在Topic模式下的routingKey中有两个特殊字符 * 和 #:

  1. *:表示一个英文单词位置。
  2. #:表示任意多个单词文字。

如:“delete.*”  表示有两个单词,以delete开始的匹配字符串:delete.aaa,delete.bbb 等等。

“#.update”表示以update结尾的所有匹配字符串:aaa.update,aaa.bbb.update等等。

代码示例:

生产者代码:

  1. 获取连接(工具类部分代码查阅RabbitMQ(三) HelloWorld )
  2. 创建信道
  3. 声明交换机指定名称以及类型topic
  4. 声明消息队列
  5. 将不同消息队列绑定到交换机上并指定相应的路由key(*#表达式)
  6. 发送消息,并指定交换机以及路由key(点隔开的表达式)
package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {//交换机名称public static final String TOPIC_EXCHANGE = "topic_exchange";//队列1public static final String TOPIC_QUEUE_1 = "delete_queue_1";//队列2public static final String TOPIC_QUEUE_2 = "update_queue_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机/*** 参数一:交换机名称* 参数二:交换机类型:fanout \topic \direct*/channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** 声明队列* durabe:持久化的消息,mq重启后消息仍在。* exclusive :独占的,一个消息队列独占一个连接*/channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);channel.queueDeclare(Producer.TOPIC_QUEUE_2,true,false,false,null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"delete.*");channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"#.update");String msg = "topic模式小兔子来啦........delete.msg";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.msg",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........delete.msg.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.msg.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........msg.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"update.msg.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........delete.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);channel.close();connection.close();}
}

消费者1代码:

package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);//声明消费队列channel.queueDeclare(Producer.TOPIC_QUEUE_1, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"delete.*");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================topic消费者delete.*开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================topic消费者delete.*结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);}
}

消费者2代码:

package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);//声明消费队列channel.queueDeclare(Producer.TOPIC_QUEUE_2, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.TOPIC_QUEUE_2,Producer.TOPIC_EXCHANGE,"#.update");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================topic消费者#.update开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================topic消费者#.update结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.TOPIC_QUEUE_2,true,consumer);}
}

运行打印,消费者一:

==================topic消费者delete.*开始===================
路由的key为:delete.msg
交换机为:topic_exchange
消息ID为:1
收到的消息为:topic模式小兔子来啦........delete.msg
===================topic消费者delete.*结束==================
==================topic消费者delete.*开始===================
路由的key为:delete.update
交换机为:topic_exchange
消息ID为:2
收到的消息为:topic模式小兔子来啦........delete.update
===================topic消费者delete.*结束==================

消费者二:

==================topic消费者#.update开始===================
路由的key为:delete.msg.update
交换机为:topic_exchange
消息ID为:1
收到的消息为:topic模式小兔子来啦........delete.msg.update
===================topic消费者#.update结束==================
==================topic消费者#.update开始===================
路由的key为:update.msg.update
交换机为:topic_exchange
消息ID为:2
收到的消息为:topic模式小兔子来啦........msg.update
===================topic消费者#.update结束==================
==================topic消费者#.update开始===================
路由的key为:delete.update
交换机为:topic_exchange
消息ID为:3
收到的消息为:topic模式小兔子来啦........delete.update
===================topic消费者#.update结束==================

RabbitMq(七) Topic模式介绍及代码示例相关推荐

  1. RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.

    RabbitMq的提供了六种模式分别是:简单模式,工作模式,发布\订阅模式,路由模式,通配符模式,RPC远程调用模式 下面将详细介绍常用的前五种模式,附上测试代码. 公共的代码---连接工具类: pu ...

  2. python简单代码画曲线图教程-Python绘制折线图和散点图的详细方法介绍(代码示例)...

    本篇文章给大家带来的内容是关于Python绘制折线图和散点图的详细方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 1.绘制折线图和散点图要用到matplotlib ...

  3. python画折线图代码-Python绘制折线图和散点图的详细方法介绍(代码示例)

    本篇文章给大家带来的内容是关于Python绘制折线图和散点图的详细方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 1.绘制折线图和散点图要用到matplotlib ...

  4. STC89C52单片机DS1302介绍以及代码示例

    目录 DS1302介绍 引脚定义与应用电路 内部结构框架图 寄存器定义 时序定义 BCD码 时间设置上的一些问题 代码示例: DS1302介绍 DS1302是由美国DALLAS公司推出的具有涓细电流充 ...

  5. php怎么创建事务,php事务的实现方法介绍(代码示例)

    本篇文章给大家带来的内容是关于php事务的实现方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助.<?php $db = new mysqli("loc ...

  6. java窗口三栏布局_移动端的flex三栏布局的相关知识介绍(代码示例)

    本篇文章给大家带来的内容是关于移动端的flex三栏布局的相关知识介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 默认情况下先显示移动端,通过 @media 属性适配屏 ...

  7. RabbitMQ入门-Topic模式

    上篇<RabbitMQ入门-Routing直连模式>我们介绍了可以定向发送消息,并可以根据自定义规则派发消息.看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样 ...

  8. BizTalk ESB Toolkit : 核心组件介绍及代码示例 (原创翻译)

    为什么需要一个企业服务总线(Enterprise Service Bus,ESB) 从IT管理的角度看,随着企业内信息化系统的不断建立,企业已经充满了各种各样的业务系统(line-of-busines ...

  9. java原子变量的作用_AtomicInteger原子类的作用介绍(代码示例)

    本篇文章给大家带来的内容是关于AtomicInteger原子类的作用介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. AtomicInteger 原子类的作用 多线程操 ...

最新文章

  1. 织梦缩略图php,dedecms怎么实现列表页缩略图随机调用
  2. pdf拼版插件_pdf文件中的文字批量转曲的办法
  3. 历史客人-报表记录信息
  4. python 管理windows客户端_在远程windows客户端上执行python脚本
  5. DFB [03] 移植遇到的实际问题 基于某著名平台
  6. Your password has expired. To log in you must change it using a client that supports expired pass...
  7. setint 的用法
  8. masm汇编语言堆栈段定义了却提示无堆栈段
  9. HTML中的window对象和document对象详解
  10. 基本的ps快捷键(图文)
  11. python换零钱有多少种方案_python练习题4.15换硬币(修正)
  12. 自由落体matlab代码,matlab仿真自由落体..doc
  13. Linux中 ll 和 ls 区别
  14. 40页PPT详解:京东大数据基础构架与创新应用
  15. 由于找不到C:\InetPub\ftproot\Tipray\Ldterm\ghijt32.DLL,无法继续执行代码。重新安装程序可能会解决此问题。
  16. 转载:Docker入门只需看这一篇就够了
  17. Vue组件自调用/无限递归导航/element-ui导航封装
  18. python一键扣图_Python实例:一键批量抠图
  19. 微信小程序---家庭记账本开发(一)
  20. MessageBox.Show()的使用

热门文章

  1. Java NIO零拷贝
  2. C# 利用反射机制开启控件双缓存
  3. 微信小程序 全局共享数据
  4. 32位linux系统支持多大内存吗,linux32位操作系统支持大内存
  5. php 单例模式 构函数,PHP设计模式——单例模式(Singleton Pattern)
  6. 基于prometheus + grafana + mysql + Telegram 监控告警
  7. [译]1-Key-Value Coding Programming Guide 官方文档第一部分
  8. Python3标准库built-in、itertools、functools中的生成器
  9. 将Myeclipse非maven项目,导入到IDEA
  10. Java反射异常:java.lang.NoSuchFieldException