2019独角兽企业重金招聘Python工程师标准>>>

前面我们把每个Message都是deliver到某个单一的Consumer。今天我们将了解如何把同一个Message deliver到多个Consumer中。这个模式也被称作 "publish / subscribe"。
    首先我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 我们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1. (转发器)Exchanges

关于exchange的概念在在这里做一下简单的介绍。

RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。

RabbitMQ消息模型的核心理念是生产者永远不会直接发送给任何的消息队列,一般情况下Producer是不知道消息应该发送到那个队列的。Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到某个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。

我们知道有三种类型的Exchange:direct, topic ,Headers和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。创建一个名字为logs,类型为fanout的Exchange:

channel.exchange_declare(exchange='logs',type='fanout');

fanout类型转发器特别简单,吧所有他接受到的消息,广播多有的他知道的队列。

前面说到的生产者只能发送详细给转发器(Exchange),但是我们之前的例子中并没有使用到转发器啊,我们仍然可以发送和接收消息,这是为什么呢?是匿名转发器(nameless exchange)搞的鬼。因为我们使用了一个默认转发器。他的标识为" ".

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一个参数为转发器名,第二个为消息队列名,如果不为空由其决定发送到那个队列中。

现在我们可以指定消息发送到转发器中。

channel.basicPublish( "logs","", null, message.getBytes());

Listing exchanges

通过rabbitmqctl可以列出当前所有的Exchange:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。

现在我们可以通过exchange,而不是routing_key来publish Message了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

2. 临时队列(Temporary queues)

截至现在,我们用的queue都是有名字的,能够为队列命名对我们来说很关键。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。

但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1) 每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。

2)当Consumer关闭连接时,这个queue要被deleted。

String queueName = channel.queueDeclare().getQueue();

通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj1wLg。

3. 绑定Bindings

现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。

channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

现在logs的exchange就将它的Message附加到我们创建的queue了。Listing bindings

使用命令rabbitmqctl list_bindings。

4. 最终版本

我们最终实现的数据流图如下:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog
{
 private final static String EXCHANGE_NAME = "ex_log";

public static void main(String[] args) throws IOException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 声明转发器和类型
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
  
  String message = new Date().toLocaleString()+" : log something";
  // 往转发器上发送消息
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

channel.close();
  connection.close();

}

}

接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToSave
{
 private final static String EXCHANGE_NAME = "ex_log";

public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 创建一个非持久的、唯一的且自动删除的队列
  String queueName = channel.queueDeclare().getQueue();
  // 为转发器指定队列,设置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二个参数为自动应答,无需手动应答
  channel.basicConsume(queueName, true, consumer);

while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

print2File(message);
  }

}

private static void print2File(String msg)
 {
  try
  {
   String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
   String logFileName = new SimpleDateFormat("yyyy-MM-dd")
     .format(new Date());
   File file = new File(dir, logFileName+".txt");
   FileOutputStream fos = new FileOutputStream(file, true);
   fos.write((msg + "\r\n").getBytes());
   fos.flush();
   fos.close();
  } catch (FileNotFoundException e)
  {
   e.printStackTrace();
  } catch (IOException e)
  {
   e.printStackTrace();
  }
 }
}

接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToConsole
{
 private final static String EXCHANGE_NAME = "ex_log";

public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 创建一个非持久的、唯一的且自动删除的队列
  String queueName = channel.queueDeclare().getQueue();
  // 为转发器指定队列,设置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二个参数为自动应答,无需手动应答
  channel.basicConsume(queueName, true, consumer);

while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
   System.out.println(" [x] Received '" + message + "'");

}

}

}

转载于:https://my.oschina.net/u/267665/blog/547369

RabbitMQ消息队列:发布/订阅(Publish/Subscribe)相关推荐

  1. RabbitMQ教程 3.发布/订阅(Publish/Subscribe)

    搜索:Java课代表,关注公众号,及时获取更多Java干货. 3 发布/订阅(Publish/Subscribe) 在上一节中,我们创建了一个工作队列.其目的是将每个任务只分发给一个worker.本节 ...

  2. 消息队列——发布订阅模式

    在 Redis 中提供了专门的类型:Publisher(发布者)和 Subscriber(订阅者)来实现消息队列. 在文章开始之前,先来介绍消息队列中有几个基础概念,以便大家更好的理解本文的内容. 首 ...

  3. Java实现redis消息队列发布/订阅模式

    最近在一个老项目中需要用消息队列,本来想着用卡夫卡,但是试了几个版本之后发现jdk和卡夫卡版本一直对不上,最后选择用redis来实现消息队列的发布/订阅模式.感谢这位大佬的博客给了我很多的帮助,htt ...

  4. springboot集成阿里ons消息队列发布订阅消息功能

    此处的项目是springboot项目.使用队列的产品是阿里云ons 消息队列 阿里云的ons消息队列是基于rockermq 项目环境.jdk1.8 使用阿里ons开发的api接口实现发布定于功能生产和 ...

  5. redis进阶之实现消息队列发布/订阅模式使用(七)

    Redis发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息.微信. 微博.关注系统! Redis 客户端可以订阅任意数量的频道. ...

  6. springboot集成阿里MNS消息队列发布订阅消息功能

    声明: 上一篇文章是springboot集成阿里ons发布订阅消息,此篇文章是mns发布订阅功能先简单记录一下ons与mns有什么区别 这里是在网上找的对比图: 此处为具体区别文章链接:点击打开链接 ...

  7. RabbitMq 发布订阅 Publish/Subscribe fanout/direct

    目录 概述 交换机 临时队列 代码 概述 在上篇中了解到rabbitmq 生产者生产消息到队列,多个消费者可以接受.这篇文章主要记录广播类型为fanout.生产者不在将产生的消息发送到队列,而是将消息 ...

  8. Redis实现消息队列和订阅发布模式

    转载:https://www.cnblogs.com/qlqwjy/p/9763754.html 在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面 ...

  9. 使用EasyNetQ组件操作RabbitMQ消息队列服务

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合, ...

  10. RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

    <===  RabbitMQ消息队列(三):任务分发机制 上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message delive ...

最新文章

  1. spring中的IOC和AOP
  2. SweetAlert用法
  3. mysql 配置自动截断_MySql超长自动截断实例详解
  4. WM6.5中隐藏和显示任务栏、命令栏及输入面板
  5. 今日恐慌与贪婪指数为38 等级从贪婪转为恐慌
  6. centos6.5安装redmine2.6
  7. 中国民间秘术大揭露【实用】
  8. Matlab2016a如何关联M文件
  9. 点云算法在条码识别任务中的应用
  10. 高效能管理之要事第一 时间管理表格2
  11. [zt]给你的Mp4大换血,精选Touch里3年收集的900多首歌,经典不忍去的最新近流行的,与你共享~~...
  12. 从互联网+角度看云计算的现状与未来(2)
  13. mysql查询员工最高最低工资三种方式
  14. Python实现list列表的有序子集查找
  15. 应用编程与裸机编程、驱动编程的区别(Linux应用编程篇)
  16. exception java .text_java.text.ParseException
  17. NCPC 2012 Problem B Bread Sorting
  18. Java毕设项目纺织代加工车间生产状态监测系统(java+VUE+Mybatis+Maven+Mysql)
  19. [小说]魔王冢(16)寻凶(二)
  20. CentOS 7 快速搭建JavaWeb开发环境并部署Spring boot项目(纯干货、详细)

热门文章

  1. 面试必考题:基本95%的面试都会被问到的?
  2. Qt与MySQL的连接与基本操作
  3. 手机修图软件测试,照片秒变高清修图软件APP
  4. python3.8和3.7共存,Windows10上解决python3.7与python3.8共存的问题
  5. php配合jade使用,详解基于模板引擎Jade的应用
  6. how to add external library in qt under ubuntu
  7. PYTORCH批标准化
  8. 在linux环境获取pcie卡信息,如何Linux下得到CPU、内存及PCI信息
  9. AI学习笔记(十三)CNN之人脸检测
  10. 机器学习算法中的F值(F-Measure)、准确率(Precision)、召回率(Recall)