上一个教程中,我们创建了一个work queue. 其中的每个task都会被精确的传送到一个worker. 这节,我们将会讲把一个message传送到多个consumers. 这种模式叫做publish/subscribe(发布/订阅).

为了说明这种模式,我们将创建一个简单的日志系统(logging system). 它由两个程序组成,一个是发送日志message并且另一个接收。

最重要的,发布的日志message将会被广播到所有的receivers

Exchangs

前面我们讲的包含下面的:producer,queue,consumer

它的主要思想是producer绝不直接发送任何message到queue. 很多情况下,producer甚至不知道一个message是否会被发送到任何queue.

如图,它会直接发送messages到一个exchange. 而对于exchange,一方面它接收来自producer的message,另一方面它把这些message推送到queues. 至于,messages是否会被发送一个特定的queue或者发送到很多queue或者丢弃,这些规则都由exchange type定义。

Exchange type: direct , topic , headers , fanout.

我们这节主要讲fanout,它会控制广播。

channel.ExchangeDeclare("logs", "fanout");

对于fanout exchange ,它会广播它收到的所有的messages 到它知道的所有的queue.

Listing exchanges

对于列出服务器上的exchanges , 你可以使用rabbitmqctl

sudo rabbitmqctl list_exchanges

The default exchange

在前面的教程中,我们不知道exchanges,但是我们仍然可以发送messages 到queues. 因为我们使用到了一个默认的exchange(a default exchange).这个默认的exchange是被空字符串(“”)定义。

回想下,我们之前怎样发送message

 var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "",  //默认的exchangeroutingKey: "hello",basicProperties: null,body: body);

此时,messages会根据指定的routingKey被路由到queue.

现在,我们可以发布到指定的exchange.

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",routingKey: "",basicProperties: null,body: body);

Temporary queues

之前我们使用过很多指定名称的queues(例如hello和task_queue). 可以命名一个queue是很重要的,我们可以指定workers到同一个queue。 而且使你可以在多个producers和consumers之前共享这个queue.

We’re also interested only in currently flowing messages not in the old ones. 我们想要最新的message而不是仅仅之前的。

这需要解决两个事情。

  1. 首先,无论什么时候我们连接Rabbit,我们需要一个新的,空的queue。为了达到这个目的,我们可以创建一个带随机名称的queue。更好的办法,我们可以让服务器给我们选择一个随机的queue名称。
  2. 第二,一旦我们断开与consumer的连接,这个queue应该被自动删除。

在.NET客户端中,我们使用下面的语句创建一个带随机名称的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)

var queueName = channel.QueueDeclare().QueueName;

Bindings

我们已经创建好了exchange和queue,它们之间的关系我们叫做binding. 用来告诉exchange发送messages到queue.

channel.QueueBind(queue: queueName,  //绑定exchange: "logs",routingKey: "");

现在,在logs exchange上会把messages发到我们的queue。

Listing bindings
rabbitmqctl list_bindings

代码

这种fanout exchanges ,在发送时,会忽视routingKey的值。

EmitLog.cs(发送)

using System;using RabbitMQ.Client;using System.Text;
class EmitLog
{public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout");  //声明exchangevar message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "logs",  //发送到logs exchangeroutingKey: "",basicProperties: null,body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0)? string.Join(" ", args): "info: Hello World!");}
}

不允许发送到一个不存在的exchange.

如果没有queue绑定到exchange,messages将会丢失。如果没有consumer正在监听,我们可以安全的丢弃这些message.

ReceiveLogs.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogs
{public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //声明exchangevar queueName = channel.QueueDeclare().QueueName;  //获得随机queue namechannel.QueueBind(queue: queueName,  //定义queue和exchange的关系exchange: "logs",routingKey: "");Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);  //回调consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
}

参考网址:

https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

转载于:https://www.cnblogs.com/Vincent-yuan/p/10940726.html

RabbitMQ学习之Publish/Subscribe(3)相关推荐

  1. RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

  2. RabbitMq 发布订阅 Publish/Subscribe fanout/direct

    目录 概述 交换机 临时队列 代码 概述 在上篇中了解到rabbitmq 生产者生产消息到队列,多个消费者可以接受.这篇文章主要记录广播类型为fanout.生产者不在将产生的消息发送到队列,而是将消息 ...

  3. RabbitMQ Tutorials 3 - Publish/Subscribe 发布/订阅

    发布/订阅 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日 ...

  4. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  5. RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)

    1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...

  6. 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅

    在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...

  7. RabbitMQ发布/订阅模式(Publish/Subscribe)

    工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...

  8. RabbitMQ消息队列:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...

  9. 【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)

    [RabbitMQ]基础三:发布与订阅模式(Publish/Subscribe) 1. 订阅模式 2. 发布与订阅模式说明 3. 代码示例 3.1 生产者 3.2 消费者 3.3 测试 4. 总结 1 ...

最新文章

  1. 《Linux内核设计与实现》读书笔记(七)- 中断处理【转】
  2. idea关联mysql失败_Server returns invalid timezone. Go to ‘Advanced‘ tab and set ‘serverTimezon‘
  3. 《计算机组成与体系结构:性能设计》读后小记 4、cache存储器
  4. 第十八章 12判断string类型字符串是否为空
  5. 16、Python与设计模式--模板模式
  6. 本地读取服务器Xml文件及本地读本地的xml
  7. (转)Bootstrap 之 Metronic 模板的学习之路 - (6)自定义和扩展
  8. python API url 级联生成
  9. ssh 远程登录_C.4 彻底解决-新版本Sentaurus TCAD的SSH远程登录问题!!!
  10. IntelliJ IDEA安卓开发环境搭建
  11. Android能装到电脑上吗,怎么在电脑上装安卓系统
  12. c语言实现三角形面积公式字母,c语言计算三角形面积代码
  13. Java,图片在table中显示并缩放2.0
  14. P1598 垂直柱状图C++
  15. 使用dig/nslookup命令查看dns解析详情
  16. 清理yarn、npm缓存包
  17. dl388 linux系统安装系统,HP DL388G5 安装64位linux虚拟系统出错!
  18. 《阿凡达2》首周末IMAX全球票房4880万美元;康泰生物新冠疫苗纳入第二剂次加强免疫接种 | 美通企业日报...
  19. 攻防世界web新手fileclude
  20. 刚开始学Java,很懵逼,不知道怎么学,能不能给点建议?

热门文章

  1. html直播动画,HTML5 直播疯狂点赞动画实现代码 附源码
  2. apache 网站转nginx_堡塔网站加速宝塔面板网站加速安装设置使用教程
  3. vs2010调用python的方法
  4. 课文电子计算机与多媒体减写,课文电子计算机与多媒体关系介绍
  5. python什么是接口设计_给女朋友讲什么叫接口设计!
  6. php 会员到期提醒_会员管理系统花了50万都没做成,用这五个功能轻松实现
  7. 阿里开源mysql日志_使用过mysql的binlog吗?看看如何用binlog排查阿里开源项目otter的问题...
  8. python main传参args,详解用Python处理Args的3种方法
  9. postgresql分页用法_postgresql分页数据重复问题的深入理解
  10. java适合ubuntu吗_java – 哪个os更适合开发:Debian或Ubuntu?