中间件系列七 RabbitMQ之header exchange(头交换机)用法
1. 概述
header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。
主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值
header Exchange类型用的比较少,但还是知道一点好
2. 本文实现功能说明:
用到的队列说明:
队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,
测试场景:
消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃
消息header数据里有一个特殊值”x-match”,它有两个值:
all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
3. 生产者代码
主要业务逻辑如下:
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机
5. 设置要发送消息的headers值(此值由外部传入)
6. 发送消息
第4,5,6步代码如下:
// 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
String message = "headers-" + System.currentTimeMillis();// 生成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();// 发送消息,并配置消息
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes("UTF-8"));
完整代码
发送者代码: HeaderSend.java
4. 消费者代码
主要业务逻辑如下:
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机
5. 声明一个临时队列
6. 将队列绑定到指定交换机上,并设置header的参数(此值由外部传入)
7. 接收消息并处理
第4,5,6,7步代码如下:
// 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到指定交换机上
channel.queueBind(queueName, EXCHANGE_NAME, "", myHeaders);System.out.println(" [HeaderRecv ["+ myHeaders +"]] Waiting for messages.");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [HeaderRecv ["+ myHeaders +"] ] Received '" + properties.getHeaders() + "':'" + message + "'");}
};
// 接收消息
channel.basicConsume(queueName, true, consumer);
完整代码
消费者代码:HeaderRecv.java
5. 测试:
BasicTest
启动上文例子中的3个消费者,再发送3个测试消息。结果符合上文的预期
@Test
public void header() throws InterruptedException {// 消费者1:绑定 format=pdf,type=reportexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","report");headers.put("x-match","all");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// 消费者2:绑定 format=pdf,type=logexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","log");headers.put("x-match","any");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// 消费者3:绑定 format=zip,type=reportexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","zip");headers.put("type","report");headers.put("x-match","all");// headers.put("x-match","any");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(2* 1000);System.out.println("=============消息1===================");// 生产者1 : format=pdf,type=reprot,x-match=allexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","report");// headers.put("x-match","all");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(5* 100);System.out.println("=============消息2===================");// 生产者2 : format=pdf,x-match=anyexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");// headers.put("x-match","any");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(5* 100);System.out.println("=============消息3===================");// 生产者3 : format=zip,type=log,x-match=allexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","zip");headers.put("type","log");// headers.put("x-match","all");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// sleep 10sThread.sleep(10 * 1000);
}
输出结果
[HeaderRecv [{x-match=any, format=pdf, type=log}]] Waiting for messages.[HeaderRecv [{x-match=all, format=pdf, type=report}]] Waiting for messages.[HeaderRecv [{x-match=all, format=zip, type=report}]] Waiting for messages.
=============消息1,被2个消费者接受===================[HeaderSend] Sent '{format=pdf, type=report}':'headers-1516257699815'[HeaderRecv [{x-match=all, format=pdf, type=report}] ] Received '{format=pdf, type=report}':'headers-1516257699815'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=pdf, type=report}':'headers-1516257699815'
=============消息2,被1个消费者接受===================[HeaderSend] Sent '{format=pdf}':'headers-1516257700323'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=pdf}':'headers-1516257700323'
=============消息3,被1个消费者接受===================[HeaderSend] Sent '{format=zip, type=log}':'headers-1516257700817'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=zip, type=log}':'headers-1516257700817'
6. 代码
上文的详细代码主要如下:
发送者代码: HeaderSend.java
消费者代码:HeaderRecv.java
测试代码:BasicTest.java的方法 header()
所有的详细代码见github代码,请尽量使用tag v0.11,不要使用master,因为master一直在变,不能保证文章中代码和github上的代码一直相同
中间件系列七 RabbitMQ之header exchange(头交换机)用法相关推荐
- 消息中间件系列(七):如何从0到1设计一个消息队列中间件
消息队列作为系统解耦,流量控制的利器,成为分布式系统核心组件之一. 如果你对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现非常重要. 不仅知其然还要知其所以然,这才是一个优秀的工程师需要具 ...
- Exchange Server2013 系列七:客户端访问服务器高可用性部署实战
Exchange Server2013 系列七:客户端访问服务器高可用性部署实战 杜飞 在前面的文章中我们介绍了客户端访问服务器的高可用性技术,从这篇文章开始,我们就来看一个详细的高可用性部署方案. ...
- OKHTTP系列(九)---http请求头(header)作用
前言 在项目开发中,网络请求是必不可少的 ,在http方面的知识学习也是不能拉下的,这里就做一波http请求头的记录. Header:请求头个别参数和描述 Header 解释 示例 Accept 指定 ...
- 拦截器获取请求参数post_「SpringBoot WEB 系列」RestTemplate 之自定义请求头
[WEB 系列]RestTemplate 之自定义请求头 上一篇介绍了 RestTemplate 的基本使用姿势,在文末提出了一些扩展的高级使用姿势,本篇将主要集中在如何携带自定义的请求头,如设置 U ...
- RabbitMQ 四种Exchange
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchang ...
- 来自极客标签10款最新设计素材-系列七
为什么80%的码农都做不了架构师?>>> 日期:2013-5-27 来源:GBin1.com 本周我们推荐来自极客标签社区带来的10款免费设计素材,大家可以在这里免费下载你需 ...
- RabbitMQ - 4种Exchange类型
在rabbitmq中,exchange有4个类型:direct,topic,fanout,header. direct exchange 此类型的exchange路由规则很简单: exchange在和 ...
- Exchange server 2010系列教程之九 配置exchange server 2010 OWA(3)
Exchange server 2010系列教程之九 配置exchange server 2010 OWA(3) 前面俩节说了owa的登录和webmail的简化,以及SSO的实现.下面大家看看这个网站 ...
- IRIS框架ctx.header响应头设置不成功问题
IRIS框架ctx.header响应头设置 Go的iris框架在使用中感觉是功能非常强大的框架,功能很完善,且可以通过框架的api灵活的处理客户端发送的请求以及返回信息. 在一个项目中需要对返回客户端 ...
- Android音视频学习系列(七) — 从0~1开发一款Android端播放器(支持多协议网络拉流本地文件)
系列文章 Android音视频学习系列(一) - JNI从入门到精通 Android音视频学习系列(二) - 交叉编译动态库.静态库的入门 Android音视频学习系列(三) - Shell脚本入门 ...
最新文章
- 1.5s~0.02s,期间我们可以做些什么?
- 943c语言,考研备战:华南理工大学943计算方法(含C语言)复试大纲_跨考网
- python列表按照批次分配数据(亲测)
- 医学图像处理期末复习(三)
- Java多线程学习二十一:ConcurrentHashMap 在 Java7 和 8 有何不同
- 持续集成部署Jenkins工作笔记0006---运行Jenkins主体程序并初始化
- 机器学习之数据转换(七)——降维
- Linux入门-第四周
- CentOS7.9下实战安装MySQL5.7
- 绕过360实现lsass转储
- 认识QA, 游戏测试工程师究竟是做什么的?
- 【ElementUI样式优化】el-input带自定义查询删除图标 ==> 图标点击可实现对应功能 ==> 一个input实现查询重置功能
- 一起学Vue自定义组件之拼图小游戏
- 我做产品的三大思维:发散思维、纵横思维和表里思维(上篇)
- sql登录名和用户名_通过分配角色和权限来移动或复制SQL登录名
- 利用python构建马科维茨_Python_画马科维茨有效前沿
- IFIX数据写入html,iFIX常见问题问答.doc
- golang 将kafka的offset置为最新
- tomcat-添加操作日志
- 【赠书】熊德意老师的一部不止于技术的神经机器翻译“百科全书”