RabbitMQ Exchange类型详解
在上一篇文章中,我们知道了RabbitMQ的消息流程如下:
但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则。
在rabbitmq中,exchange有4个类型:direct,topic,fanout,header。
direct exchange
此类型的exchange路由规则很简单:
exchange在和queue进行binding时会设置routingkey
channel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);
然后我们在将消息发送到exchange时会设置对应的routingkey:
channel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);
在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。
具体的流程如下:
通过代码可以会理解好一点:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Direct类型的exchange, 名称 pdf_eventschannel.ExchangeDeclare(exchange: "pdf_events",type: ExchangeType.Direct,durable: true,autoDelete: false,arguments: null);// 创建create_pdf_queue队列channel.QueueDeclare(queue: "create_pdf_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 pdf_log_queue队列channel.QueueDeclare(queue: "pdf_log_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 pdf_events --> create_pdf_queue 使用routingkey:pdf_createchannel.QueueBind(queue: "create_pdf_queue",exchange: "pdf_events",routingKey: "pdf_create",arguments: null);//绑定 pdf_events --> pdf_log_queue 使用routingkey:pdf_logchannel.QueueBind(queue: "pdf_log_queue",exchange: "pdf_events",routingKey: "pdf_log",arguments: null);var message = "Demo some pdf creating...";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_create//通过binding routinekey的比较,次消息会路由到队列 create_pdf_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_create",basicProperties: properties,body: body);message = "pdf loging ...";body = Encoding.UTF8.GetBytes(message);properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :pdf_events ,使用routingkey: pdf_log//通过binding routinekey的比较,次消息会路由到队列 pdf_log_queuechannel.BasicPublish(exchange: "pdf_events",routingKey: "pdf_log",basicProperties: properties,body: body);}
topic exchange
此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.
其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词
如上图第一个binding:
- exchange: agreements
- queue A: berlin_agreements
- binding routingkey: agreements.eu.berlin.#
第二个binding:
- exchange: agreements
- queue B: all_agreements
- binding routingkey: agreements.#
第三个binding:
- exchange: agreements
- queue c: headstore_agreements
- binding routingkey: agreements.eu.*.headstore
所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合,具体的代码如下:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Topic类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Topic,durable: true,autoDelete: false,arguments: null);// 创建berlin_agreements队列channel.QueueDeclare(queue: "berlin_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 all_agreements 队列channel.QueueDeclare(queue: "all_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//创建 headstore_agreements 队列channel.QueueDeclare(queue: "headstore_agreements",durable: true,exclusive: false,autoDelete: false,arguments: null);//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#channel.QueueBind(queue: "berlin_agreements",exchange: "agreements",routingKey: "agreements.eu.berlin.#",arguments: null);//绑定 agreements --> all_agreements 使用routingkey:agreements.#channel.QueueBind(queue: "all_agreements",exchange: "agreements",routingKey: "agreements.#",arguments: null);//绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstorechannel.QueueBind(queue: "headstore_agreements",exchange: "agreements",routingKey: "agreements.eu.*.headstore",arguments: null);var message = "hello world";var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;//发送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin//agreements.eu.berlin 匹配 agreements.eu.berlin.# 和agreements.#//agreements.eu.berlin 不匹配 agreements.eu.*.headstore//最终次消息会路由到队里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)channel.BasicPublish(exchange: "agreements",routingKey: "agreements.eu.berlin",basicProperties: properties,body: body);}
fanout exchange
此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。
header exchange
此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);
其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。
在发布消息的时候就需要传入header值:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;
具体的规则可以看以下代码:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// Headers类型的exchange, 名称 agreementschannel.ExchangeDeclare(exchange: "agreements",type: ExchangeType.Headers,durable: true,autoDelete: false,arguments: null);// 创建queue.A队列channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.B 队列channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建 queue.C 队列channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);//绑定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)Dictionary<string, object> aHeader = new Dictionary<string, object>();aHeader.Add("format", "pdf");aHeader.Add("type", "report");aHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.A",exchange: "agreements",routingKey: string.Empty,arguments: aHeader);//绑定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)Dictionary<string, object> bHeader = new Dictionary<string, object>();bHeader.Add("format", "pdf");bHeader.Add("type", "log");bHeader.Add("x-match", "any");channel.QueueBind(queue: "queue.B",exchange: "agreements",routingKey: string.Empty,arguments: bHeader);//绑定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)Dictionary<string, object> cHeader = new Dictionary<string, object>();cHeader.Add("format", "zip");cHeader.Add("type", "report");cHeader.Add("x-match", "all");channel.QueueBind(queue: "queue.C",exchange: "agreements",routingKey: string.Empty,arguments: cHeader);string message1 = "hello world";var body = Encoding.UTF8.GetBytes(message1);var properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader1 = new Dictionary<string, object>();mHeader1.Add("format", "pdf");mHeader1.Add("type", "report");properties.Headers = mHeader1;//此消息路由到 queue.A 和 queue.B//queue.A 的binding (format=pdf, type=report, x-match=all)//queue.B 的binding (format = pdf, type = log, x - match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message2 = "hello world";body = Encoding.UTF8.GetBytes(message2);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader2 = new Dictionary<string, object>();mHeader2.Add("type", "log");properties.Headers = mHeader2;//x-match 配置queue.B //queue.B 的binding (format = pdf, type = log, x-match = any)channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);string message3= "hello world";body = Encoding.UTF8.GetBytes(message3);properties = channel.CreateBasicProperties();properties.Persistent = true;Dictionary<string, object> mHeader3 = new Dictionary<string, object>();mHeader3.Add("format", "zip");properties.Headers = mHeader3;//配置失败,不会被路由channel.BasicPublish(exchange: "agreements",routingKey: string.Empty,basicProperties: properties,body: body);}
总计
以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。
header类型用的比较少,但还是知道一点好。
RabbitMQ Exchange类型详解相关推荐
- C++11 并发指南六(atomic 类型详解四 C 风格原子操作介绍)
前面三篇文章<C++11 并发指南六(atomic 类型详解一 atomic_flag 介绍)>.<C++11 并发指南六( <atomic> 类型详解二 std::at ...
- C++11 并发指南六( atomic 类型详解二 std::atomic )
C++11 并发指南六(atomic 类型详解一 atomic_flag 介绍) 一文介绍了 C++11 中最简单的原子类型 std::atomic_flag,但是 std::atomic_flag ...
- RabbitMQ基础知识详解
RabbitMQ基础知识详解 2017年08月28日 20:42:57 dreamchasering 阅读数:41890 标签: RabbitMQ 什么是MQ? MQ全称为Message Queue, ...
- java原生类型没有封装_Java基本数据类型与封装类型详解(int和Integer区别)
Java基本数据类型与封装类型详解(int和Integer区别) 发布于 2020-4-19| 复制链接 摘记: int是java提供的8种原始数据类型之一.Java为每个原始类型提供了封装类,Int ...
- python变量类型-Python 变量类型详解
变量存储在内存中的值.这就意味着在创建变量时会在内存中开辟一个空间. 基于变量的数据类型,解释器会分配指定内存,并决定什么数据可以被存储在内存中. 因此,变量可以指定不同的数据类型,这些变量可以存储整 ...
- mysql数据库的字符串表示什么意思_MySQL数据库的字符串类型详解(01)
Mysql的数据类型主要分为三类:数字类型.字符串(字符)类型.日期和时间类型,由于时间紧迫,根据学习的需要 数字类型暂不做详解,等待有时间了在修改此文档,此文主要介绍mysql 数据类型中的字符串类 ...
- 并发编程-04线程安全性之原子性Atomic包的4种类型详解
文章目录 线程安全性文章索引 脑图 概述 原子更新基本类型 Demo AtomicBoolean 场景举例 原子更新数组 Demo 原子更新引用类型 Demo 原子更新字段类型 使用注意事项: Dem ...
- python内置序列类型_Python序列内置类型之元组类型详解
Python序列内置类型之元组类型详解 1.元祖的概念 Python中的元组与列表类似,都是一个序列,不同的是元组的元素不能修改而已. 2.元组的创建 元组使用小括号,列表使用方括号. tup = ( ...
- java 封装表单数据类型_Java基本数据类型与封装类型详解(int和Integer区别)
int是java提供的8种原始数据类型之一. Java为每个原始类型提供了封装类,Integer是java为int提供的封装类(即Integer是一个java对象,而int只是一个基本数据类型).in ...
最新文章
- Chapter 0: 引论
- HDU6181(K短路问题)
- java 清空一个list数据库_java – JPA EntityManager删除数据库中的所有记录
- Android使用WebView加载网页
- Python的几种主动结束程序方式
- 一个被遗忘的ccflow工作流引擎自定义表单开发模式
- Docker三个基本概念镜像(Image)容器(Container)仓库(Repository)
- Mark task complete in checkbox S2 Resource not found for the segment Tasks
- Bitmap尺度变换
- 在ASP.NET Core 2.0中创建Web API
- C语言-库文件与头文件
- vue中的循环v-for
- 【转】Linux 的启动流程
- 爱尔兰圣三一大学计算机专业硕士,爱尔兰圣三一学院研究生申请要求
- arm嵌入式系统C语言代码,ARM嵌入式系统C语言编程.pdf
- android Studio里查看db文件
- 2023我的秋招之路(1)7.21NJ银行金融科技岗提前批笔试
- 89c51的万年历c语言,用AT89C51与DS1302做的万年历c语言编程
- cesium 鼠标点击pick与drillPick的区别
- 802.11无线局域网的安全机制
热门文章
- Facebook广告设定技巧经验分享
- 跨境商家为什么要建自己的独立站?
- java motherfree video_Java Config 下的Spring Test方式
- 动词ing基本用法_哪些动词后面只能接动名词背诵口诀
- Leetcode每日一题:16.3sum-closest(最接近的三数之和)
- 吴恩达机器学习作业 6.支持向量机
- 一些值得注意的算法题——哈希表
- 达梦数据charindex_更新日志 · dotnetcore/FreeSql Wiki · GitHub
- python实时策略_Python策略模式
- html管道符需要转义么,为什么String.split需要管道分隔符进行转义?