1. 概述

header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。
主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值
header Exchange类型用的比较少,但还是知道一点好

2. 本文实现功能说明:

用到的队列说明:

队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,

测试场景:

消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃

消息header数据里有一个特殊值”x-match”,它有两个值:

    all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

3. 生产者代码

主要业务逻辑如下:
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机
5. 设置要发送消息的headers值(此值由外部传入)
6. 发送消息

第4,5,6步代码如下:

 // 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
String message = "headers-" + System.currentTimeMillis();// 生成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();// 发送消息,并配置消息
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes("UTF-8"));

完整代码
发送者代码: HeaderSend.java

4. 消费者代码

主要业务逻辑如下:

1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机
5. 声明一个临时队列
6. 将队列绑定到指定交换机上,并设置header的参数(此值由外部传入)
7. 接收消息并处理

第4,5,6,7步代码如下:

// 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到指定交换机上
channel.queueBind(queueName, EXCHANGE_NAME, "", myHeaders);System.out.println(" [HeaderRecv ["+ myHeaders +"]] Waiting for messages.");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [HeaderRecv ["+ myHeaders +"] ] Received '" + properties.getHeaders() + "':'" + message + "'");}
};
// 接收消息
channel.basicConsume(queueName, true, consumer);

完整代码
消费者代码:HeaderRecv.java

5. 测试:

BasicTest
启动上文例子中的3个消费者,再发送3个测试消息。结果符合上文的预期

@Test
public void header() throws InterruptedException {// 消费者1:绑定 format=pdf,type=reportexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","report");headers.put("x-match","all");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// 消费者2:绑定  format=pdf,type=logexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","log");headers.put("x-match","any");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// 消费者3:绑定  format=zip,type=reportexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","zip");headers.put("type","report");headers.put("x-match","all");//   headers.put("x-match","any");HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(2* 1000);System.out.println("=============消息1===================");// 生产者1 : format=pdf,type=reprot,x-match=allexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");headers.put("type","report");//     headers.put("x-match","all");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(5* 100);System.out.println("=============消息2===================");// 生产者2 : format=pdf,x-match=anyexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","pdf");//     headers.put("x-match","any");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});Thread.sleep(5* 100);System.out.println("=============消息3===================");// 生产者3 : format=zip,type=log,x-match=allexecutorService.submit(() -> {Map<String,Object> headers = new HashMap();headers.put("format","zip");headers.put("type","log");//      headers.put("x-match","all");HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);});// sleep 10sThread.sleep(10 * 1000);
}

输出结果

 [HeaderRecv [{x-match=any, format=pdf, type=log}]] Waiting for messages.[HeaderRecv [{x-match=all, format=pdf, type=report}]] Waiting for messages.[HeaderRecv [{x-match=all, format=zip, type=report}]] Waiting for messages.
=============消息1,被2个消费者接受===================[HeaderSend] Sent '{format=pdf, type=report}':'headers-1516257699815'[HeaderRecv [{x-match=all, format=pdf, type=report}] ] Received '{format=pdf, type=report}':'headers-1516257699815'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=pdf, type=report}':'headers-1516257699815'
=============消息2,被1个消费者接受===================[HeaderSend] Sent '{format=pdf}':'headers-1516257700323'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=pdf}':'headers-1516257700323'
=============消息3,被1个消费者接受===================[HeaderSend] Sent '{format=zip, type=log}':'headers-1516257700817'[HeaderRecv [{x-match=any, format=pdf, type=log}] ] Received '{format=zip, type=log}':'headers-1516257700817'

6. 代码

上文的详细代码主要如下:
发送者代码: HeaderSend.java
消费者代码:HeaderRecv.java
测试代码:BasicTest.java的方法 header()
所有的详细代码见github代码,请尽量使用tag v0.11,不要使用master,因为master一直在变,不能保证文章中代码和github上的代码一直相同

中间件系列七 RabbitMQ之header exchange(头交换机)用法相关推荐

  1. 消息中间件系列(七):如何从0到1设计一个消息队列中间件

    消息队列作为系统解耦,流量控制的利器,成为分布式系统核心组件之一. 如果你对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现非常重要. 不仅知其然还要知其所以然,这才是一个优秀的工程师需要具 ...

  2. Exchange Server2013 系列七:客户端访问服务器高可用性部署实战

    Exchange Server2013 系列七:客户端访问服务器高可用性部署实战 杜飞 在前面的文章中我们介绍了客户端访问服务器的高可用性技术,从这篇文章开始,我们就来看一个详细的高可用性部署方案. ...

  3. OKHTTP系列(九)---http请求头(header)作用

    前言 在项目开发中,网络请求是必不可少的 ,在http方面的知识学习也是不能拉下的,这里就做一波http请求头的记录. Header:请求头个别参数和描述 Header 解释 示例 Accept 指定 ...

  4. 拦截器获取请求参数post_「SpringBoot WEB 系列」RestTemplate 之自定义请求头

    [WEB 系列]RestTemplate 之自定义请求头 上一篇介绍了 RestTemplate 的基本使用姿势,在文末提出了一些扩展的高级使用姿势,本篇将主要集中在如何携带自定义的请求头,如设置 U ...

  5. RabbitMQ 四种Exchange

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchang ...

  6. 来自极客标签10款最新设计素材-系列七

    为什么80%的码农都做不了架构师?>>>    日期:2013-5-27  来源:GBin1.com 本周我们推荐来自极客标签社区带来的10款免费设计素材,大家可以在这里免费下载你需 ...

  7. RabbitMQ - 4种Exchange类型

    在rabbitmq中,exchange有4个类型:direct,topic,fanout,header. direct exchange 此类型的exchange路由规则很简单: exchange在和 ...

  8. Exchange server 2010系列教程之九 配置exchange server 2010 OWA(3)

    Exchange server 2010系列教程之九 配置exchange server 2010 OWA(3) 前面俩节说了owa的登录和webmail的简化,以及SSO的实现.下面大家看看这个网站 ...

  9. IRIS框架ctx.header响应头设置不成功问题

    IRIS框架ctx.header响应头设置 Go的iris框架在使用中感觉是功能非常强大的框架,功能很完善,且可以通过框架的api灵活的处理客户端发送的请求以及返回信息. 在一个项目中需要对返回客户端 ...

  10. Android音视频学习系列(七) — 从0~1开发一款Android端播放器(支持多协议网络拉流本地文件)

    系列文章 Android音视频学习系列(一) - JNI从入门到精通 Android音视频学习系列(二) - 交叉编译动态库.静态库的入门 Android音视频学习系列(三) - Shell脚本入门 ...

最新文章

  1. 1.5s~0.02s,期间我们可以做些什么?
  2. 943c语言,考研备战:华南理工大学943计算方法(含C语言)复试大纲_跨考网
  3. python列表按照批次分配数据(亲测)
  4. 医学图像处理期末复习(三)
  5. Java多线程学习二十一:ConcurrentHashMap 在 Java7 和 8 有何不同
  6. 持续集成部署Jenkins工作笔记0006---运行Jenkins主体程序并初始化
  7. 机器学习之数据转换(七)——降维
  8. Linux入门-第四周
  9. CentOS7.9下实战安装MySQL5.7
  10. 绕过360实现lsass转储
  11. 认识QA, 游戏测试工程师究竟是做什么的?
  12. 【ElementUI样式优化】el-input带自定义查询删除图标 ==> 图标点击可实现对应功能 ==> 一个input实现查询重置功能
  13. 一起学Vue自定义组件之拼图小游戏
  14. 我做产品的三大思维:发散思维、纵横思维和表里思维(上篇)
  15. sql登录名和用户名_通过分配角色和权限来移动或复制SQL登录名
  16. 利用python构建马科维茨_Python_画马科维茨有效前沿
  17. IFIX数据写入html,iFIX常见问题问答.doc
  18. golang 将kafka的offset置为最新
  19. tomcat-添加操作日志
  20. 【赠书】熊德意老师的一部不止于技术的神经机器翻译“百科全书”

热门文章

  1. python使用金山词霸的翻译功能
  2. 清代徽州家政与乡族社会的善治
  3. 详细且通俗讲解轻量级神经网络——MobileNets【V1、V2、V3】
  4. 重装linux式化磁盘,最详细的linux系统重装步骤图解
  5. PDF转换器可以做到PDF转Office,TXT,HTM,PDF文件;PDF合并拆分,压缩,加密解密!
  6. 安卓12使用VNET免ROOT抓包微信小程序
  7. 典型企业设备链路冗余备份方案
  8. 处理海量数据的方法与思路
  9. Word操作系列-给方框打钩
  10. SNAP 4. 使用snap进行地物光谱分析