RabbitMq(七) Topic模式介绍及代码示例
概述:
在上一文章中我们介绍了路由模式(Routing),routing模式是不同的消息队列绑定了不同的路由key,但是我们看出路由key为固定的字符串标记。而本章中的Topic模式则为在路由模式下,我们在绑定消息队列到交换机时指定的路由key为一个表达式,如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。该routingKey表达式必须使用点隔开的任意多个英文单词。另该交换机类型需要调整为topic类型。
在Topic模式下的routingKey中有两个特殊字符 * 和 #:
- *:表示一个英文单词位置。
- #:表示任意多个单词文字。
如:“delete.*” 表示有两个单词,以delete开始的匹配字符串:delete.aaa,delete.bbb 等等。
“#.update”表示以update结尾的所有匹配字符串:aaa.update,aaa.bbb.update等等。
代码示例:
生产者代码:
- 获取连接(工具类部分代码查阅RabbitMQ(三) HelloWorld )
- 创建信道
- 声明交换机指定名称以及类型topic
- 声明消息队列
- 将不同消息队列绑定到交换机上并指定相应的路由key(*#表达式)
- 发送消息,并指定交换机以及路由key(点隔开的表达式)
package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {//交换机名称public static final String TOPIC_EXCHANGE = "topic_exchange";//队列1public static final String TOPIC_QUEUE_1 = "delete_queue_1";//队列2public static final String TOPIC_QUEUE_2 = "update_queue_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机/*** 参数一:交换机名称* 参数二:交换机类型:fanout \topic \direct*/channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** 声明队列* durabe:持久化的消息,mq重启后消息仍在。* exclusive :独占的,一个消息队列独占一个连接*/channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);channel.queueDeclare(Producer.TOPIC_QUEUE_2,true,false,false,null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"delete.*");channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"#.update");String msg = "topic模式小兔子来啦........delete.msg";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.msg",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........delete.msg.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.msg.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........msg.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"update.msg.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);msg = "topic模式小兔子来啦........delete.update";channel.basicPublish(Producer.TOPIC_EXCHANGE,"delete.update",null,msg.getBytes());System.out.println("已发送消息:"+msg);channel.close();connection.close();}
}
消费者1代码:
package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);//声明消费队列channel.queueDeclare(Producer.TOPIC_QUEUE_1, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"delete.*");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================topic消费者delete.*开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================topic消费者delete.*结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);}
}
消费者2代码:
package com.xiaohui.rabbitmq.topic;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);//声明消费队列channel.queueDeclare(Producer.TOPIC_QUEUE_2, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.TOPIC_QUEUE_2,Producer.TOPIC_EXCHANGE,"#.update");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================topic消费者#.update开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================topic消费者#.update结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.TOPIC_QUEUE_2,true,consumer);}
}
运行打印,消费者一:
==================topic消费者delete.*开始===================
路由的key为:delete.msg
交换机为:topic_exchange
消息ID为:1
收到的消息为:topic模式小兔子来啦........delete.msg
===================topic消费者delete.*结束==================
==================topic消费者delete.*开始===================
路由的key为:delete.update
交换机为:topic_exchange
消息ID为:2
收到的消息为:topic模式小兔子来啦........delete.update
===================topic消费者delete.*结束==================
消费者二:
==================topic消费者#.update开始===================
路由的key为:delete.msg.update
交换机为:topic_exchange
消息ID为:1
收到的消息为:topic模式小兔子来啦........delete.msg.update
===================topic消费者#.update结束==================
==================topic消费者#.update开始===================
路由的key为:update.msg.update
交换机为:topic_exchange
消息ID为:2
收到的消息为:topic模式小兔子来啦........msg.update
===================topic消费者#.update结束==================
==================topic消费者#.update开始===================
路由的key为:delete.update
交换机为:topic_exchange
消息ID为:3
收到的消息为:topic模式小兔子来啦........delete.update
===================topic消费者#.update结束==================
RabbitMq(七) Topic模式介绍及代码示例相关推荐
- RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.
RabbitMq的提供了六种模式分别是:简单模式,工作模式,发布\订阅模式,路由模式,通配符模式,RPC远程调用模式 下面将详细介绍常用的前五种模式,附上测试代码. 公共的代码---连接工具类: pu ...
- python简单代码画曲线图教程-Python绘制折线图和散点图的详细方法介绍(代码示例)...
本篇文章给大家带来的内容是关于Python绘制折线图和散点图的详细方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 1.绘制折线图和散点图要用到matplotlib ...
- python画折线图代码-Python绘制折线图和散点图的详细方法介绍(代码示例)
本篇文章给大家带来的内容是关于Python绘制折线图和散点图的详细方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 1.绘制折线图和散点图要用到matplotlib ...
- STC89C52单片机DS1302介绍以及代码示例
目录 DS1302介绍 引脚定义与应用电路 内部结构框架图 寄存器定义 时序定义 BCD码 时间设置上的一些问题 代码示例: DS1302介绍 DS1302是由美国DALLAS公司推出的具有涓细电流充 ...
- php怎么创建事务,php事务的实现方法介绍(代码示例)
本篇文章给大家带来的内容是关于php事务的实现方法介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助.<?php $db = new mysqli("loc ...
- java窗口三栏布局_移动端的flex三栏布局的相关知识介绍(代码示例)
本篇文章给大家带来的内容是关于移动端的flex三栏布局的相关知识介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 默认情况下先显示移动端,通过 @media 属性适配屏 ...
- RabbitMQ入门-Topic模式
上篇<RabbitMQ入门-Routing直连模式>我们介绍了可以定向发送消息,并可以根据自定义规则派发消息.看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样 ...
- BizTalk ESB Toolkit : 核心组件介绍及代码示例 (原创翻译)
为什么需要一个企业服务总线(Enterprise Service Bus,ESB) 从IT管理的角度看,随着企业内信息化系统的不断建立,企业已经充满了各种各样的业务系统(line-of-busines ...
- java原子变量的作用_AtomicInteger原子类的作用介绍(代码示例)
本篇文章给大家带来的内容是关于AtomicInteger原子类的作用介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. AtomicInteger 原子类的作用 多线程操 ...
最新文章
- 织梦缩略图php,dedecms怎么实现列表页缩略图随机调用
- pdf拼版插件_pdf文件中的文字批量转曲的办法
- 历史客人-报表记录信息
- python 管理windows客户端_在远程windows客户端上执行python脚本
- DFB [03] 移植遇到的实际问题 基于某著名平台
- Your password has expired. To log in you must change it using a client that supports expired pass...
- setint 的用法
- masm汇编语言堆栈段定义了却提示无堆栈段
- HTML中的window对象和document对象详解
- 基本的ps快捷键(图文)
- python换零钱有多少种方案_python练习题4.15换硬币(修正)
- 自由落体matlab代码,matlab仿真自由落体..doc
- Linux中 ll 和 ls 区别
- 40页PPT详解:京东大数据基础构架与创新应用
- 由于找不到C:\InetPub\ftproot\Tipray\Ldterm\ghijt32.DLL,无法继续执行代码。重新安装程序可能会解决此问题。
- 转载:Docker入门只需看这一篇就够了
- Vue组件自调用/无限递归导航/element-ui导航封装
- python一键扣图_Python实例:一键批量抠图
- 微信小程序---家庭记账本开发(一)
- MessageBox.Show()的使用
热门文章
- Java NIO零拷贝
- C# 利用反射机制开启控件双缓存
- 微信小程序 全局共享数据
- 32位linux系统支持多大内存吗,linux32位操作系统支持大内存
- php 单例模式 构函数,PHP设计模式——单例模式(Singleton Pattern)
- 基于prometheus + grafana + mysql + Telegram 监控告警
- [译]1-Key-Value Coding Programming Guide 官方文档第一部分
- Python3标准库built-in、itertools、functools中的生成器
- 将Myeclipse非maven项目,导入到IDEA
- Java反射异常:java.lang.NoSuchFieldException