前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用。如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好理解。简单的可以理解成direct是通过routingkey精准匹配的,而topic是通过routingkey来模糊匹配。
在topic模式下支持两个特殊字符的匹配。

* (星号) 代表任意 一个单词
# (井号) 0个或者多个单词

注意:上面说的是单词不是字符。

如下图所示,RabbitMQ direct模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。

我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例:

  • RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

  • RoutingKey为“red.critical.high”的日志会只投递到queue2。

  • RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。

新建topic.php用来发布三种routingkey的消息。


<?php/** topic 模式* create by superrd*/$exchangeName = 'extopic';
$routeKey1 = "black.critical.high";
$routeKey2 = "red.critical.high";
$routeKey3 = "white.critical.high";$message1 = 'black-critical-high!';
$message2 = 'red-critical-high!';
$message3 = 'white-critical-high!';$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
try {$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_TOPIC);$exchange->setFlags(AMQP_DURABLE);$exchange->declareExchange();$exchange->publish($message1,$routeKey1);var_dump("[x] Sent ".$message1);$exchange->publish($message2,$routeKey2);var_dump("[x] Sent ".$message2);$exchange->publish($message3,$routeKey3);var_dump("[x] Sent ".$message3);} catch (AMQPConnectionException $e) {var_dump($e);exit();
}$connection->disconnect();

q1.php用来监听queue1队列:


<?php/** topic 模式* create by superrd*/$queueName = 'queue1';
$exchangeName = 'extopic';
$routeKey = "black.critical.high";$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();$queue->bind($exchangeName, $routeKey);//阻塞模式接收消息echo "Message:\n";
while(True){$queue->consume('processMessage');
//自动ACK应答//$queue->consume('processMessage', AMQP_AUTOACK);
}$conn->disconnect();
/*
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $q) {$msg = $envelope->getBody();echo $msg."\n"; //处理消息$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

q2.php用来监听queue2队列:

<?php/** topic 模式* create by superrd*/$queueName = 'queue2';
$exchangeName = 'extopic';
$routeKey1 = "#.high";
$routeKey2 = "white.critical.*";$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();$queue->bind($exchangeName, $routeKey1);
$queue->bind($exchangeName, $routeKey2);//阻塞模式接收消息echo "Message:\n";
while(True){$queue->consume('processMessage');
//自动ACK应答//$queue->consume('processMessage', AMQP_AUTOACK);
}$conn->disconnect();
/*
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $q) {$msg = $envelope->getBody();echo $msg."\n"; //处理消息$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

先运行q1.php和q2.php脚本保持订阅状态。然后执行topic.php脚本发布消息。q1和q2收到的消息如下:

如上截图,验证了我们之前的结论。

另外还有一些特殊情况例如:

  1. 如果binding_key 是 “#” - 它会接收所有的Message,不管routing_key是什么,就像是fanout
    exchange。
  2. 如果 “*” and “#” 没有被使用,那么topic exchange就变成了direct exchange。

RabbitMQ技术交流QQ群:327034977(添加时请备注RabbitMQ)

(八)RabbitMQ消息队列-通过Topic主题模式分发消息相关推荐

  1. RabbitMQ学习系列(五):routing路由模式和Topic主题模式

    (一)routing路由模式 在前面一篇博客中讲到了exchange的类型,其中direct类型的exchange就是用于routing路由模式.direct类型的交换机是指:交换机和队列绑定时会设置 ...

  2. 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列

    [重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...

  3. 【消息队列笔记】chp2-如何选择消息队列

    一.选择消息队列的基本标准 不同的消息队列产品在功能和特性方面是各有优劣的,但是我们在选择的时候应尽量保证一个通用的最低标准. 1.必须是开源的产品 开源很重要,如果在使用该产品时遇到了影响业务的bu ...

  4. RTX5 | 消息队列02 - 放入与取出消息

    文章目录 一.前言 二.实验目的 三.API 3.1.osMessageQueuePut 3.2.osMessageQueueGet 四.代码 4.1.main.h 4.2.main.c 五.DEBU ...

  5. RabbitMQ消息分发模式----Topic主题模式

    前面虽然有Direct类型和Fanout的转换器.但它们仍然有一定的局限性--不能根据多重条件进行路由选择. Topic exchange(主题转发器) 发送给主题转发器的消息不能是任意设置的选择键, ...

  6. RabbitMQ之订阅模式与主题模式,消息确认机制

    1.订阅模式 作用类似与微信公众号,你订阅了就可以接收到消息 解读: 1.一个生产者,多个消费者. 2.每一个消费者 都有自己的队列 3.生产者没有直搂把洧息发送到队列而是发到了交换机 转发器exch ...

  7. RabbitMQ(六):主题模式

    一.主题模式 官方内容参考:http://www.rabbitmq.com/tutorials/tutorial-five-java.html 跟路由模式类似,只不过路由模式是指定固定的路由键,而主题 ...

  8. RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

    <===  RabbitMQ消息队列(三):任务分发机制 上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message delive ...

  9. 译: 5. RabbitMQ Spring AMQP 之 Topic 主题

    在上一个教程中,我们提高了消息传递的灵活 我们使用direct交换而不是使用仅能够进行虚拟广播的fanout交换, 并且获得了基于路由key 有选择地接收消息的可能性. 虽然使用direct 交换改进 ...

最新文章

  1. Caffe 作者贾扬清:我们应该跳出框架的桎梏,往更广泛的领域寻找价值
  2. CTF Geek Challenge——第十一届极客大挑战Web Write Up
  3. 将矩阵转为一行_LeetCode 力扣官方题解 | 861. 翻转矩阵后的得分
  4. LeetCode 1039. 多边形三角剖分的最低得分(区间DP)
  5. 常用的几款工具让 Kubernetes 集群上的工作更容易
  6. 【ABAP系列】SAP ABAP 宏的简单使用
  7. JAVA线程池的创建
  8. windows、Linux下nginx搭建集群
  9. 【大数据部落】基于随机森林、svm、CNN机器学习的风控欺诈识别模型
  10. 拓端tecdat|R语言ARMA-GARCH-COPULA模型和金融时间序列案例
  11. 手机浏览器服务器修复,手机IE浏览器怎么修复
  12. 微信小程序新手向——界面布局
  13. 在语雀中输入汉语拼音
  14. 802.11协议——初探
  15. ubuntu桌面图标不显示问题
  16. nacos配置中心提示com.alibaba.nacos.shaded.io.grpc.StatusRuntimeException: UNAVAILABLE: io exceptio
  17. 华芯微特SWM320TFT屏人机交互方案手册
  18. 画论27 宋徽宗敕纂《宣和画谱》
  19. Scratch少儿编程系列目录
  20. linux查找以c开头的的文件夹,【Linux】 find指令(文件查找)

热门文章

  1. 后台返回视频流,前台播放和下载
  2. 数学建模——整数规划
  3. 【银行家算法-安全性算法】
  4. chrome浏览器常用插件
  5. SELECT 语句基本用法介绍
  6. 朗朗与机器人合奏_咱厝“小郎朗”上《开学第一课》 与机器人PK弹奏
  7. gamma分布的推导与理解
  8. crontab定时任务_net
  9. pycharm 一键折叠(或展开)所有代码片段
  10. sso java_GitHub - shaojintian/sso-java: 分布式单点登录