内容翻译自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(三):Exchange交换器--fanout

RabbitMQ(四):Exchange交换器--direct

RabbitMQ(五):Exchange交换器--topic

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(七):常用方法说明 与 学习小结


Publish/Subscribe:

在上一篇博客中,我们创建了一个工作队列:一个消息只能发送到一个工作者(消费者)中。而在这个教程中我们将会做完全不同的事情:我们发送同一个消息到多个消费者中。这种模式一般被称为“发布/订阅”模式。

为了演示这种模式,我们将会创建一个简单的日志系统。它由两个程序组成:第一个将会输出日志消息,第二个将会接受并打印出日志消息。

在这个日志系统中,每一个接收程序(消费者)都会收到所有的消息,其中一个消费者将消息直接保存到磁盘中,而另一个消费者则将日志输出到控制台。

从本质上讲,发布的日志消息将会广播给所有的接收者(消费者)。


交换器Exchanges:

在之前的教程里,我们都是直接往队列里发送消息,然后又直接从队列里取出消息。现在是时候介绍RabbitMQ的整个消息模型了。

先让我们快速地回顾一下之前教程中的几个概念:

(1)生产者:发送消息的用户程序

(2)队列:存储消息的缓冲区

(3)消费者:接收消息的用户程序

RabbitMQ的消息模型中的一个核心思想是,生产者绝不会将消息直接发送到队列中,实际上,在大部分场景中生产者根本不知道消息会发送到哪些队列中。

相反,生产者只会将消息发送给一个Exchange(路由器/交换器)。Exchange其实很简单,它所做的就是,接收生产者发来的消息,并将这些消息推送到队列中。Exchange必须清楚地知道怎么处理接收到的消息:是将消息放到一个特定的队列中,还是放到多个队列中,还是直接将消息丢弃。下图示意了Exchange在消息模型中的位置:

Exchange一共有四种类型:directtopicheadersfanout。今天的教程将会使用fanout类型的Exchange,让我们创建一个名为logsfanout类型的Exchange

channel.exchangeDeclare("logs", "fanout");

fanout类型的Exchange非常简单,从它的名字你可能就已经猜出来了(fanout翻译过来是扇形的意思),它将会将接收到的消息广播给所有它知道的队列。这正是我们的日志系统所需要的类型。

可以通过下面的命令列出Rabbit服务器上的所有Exchange

sudo rabbitmqctl list_exchanges

没有命名的Exchange:

在前面的教程中,我们对Exchange一无所知,但是我们仍然可以将消息发送到队列中,这可能是因为我们使用了默认的Exchange,我们是通过空字符串""来定义这个Exchange的。回想一下我们之前是怎么发布消息的:

channel.basicPublish("", "hello", null, message.getBytes());
//该方法的定义为:
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

上面代码的方法中,第一个参数就是Exchange的名字,空字符串表示默认或无名Exchange:消息通过由routingKey定义的队列被路由的。

现在,我们通过下面的方式来发布消息:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列:

你可能记得之前我们使用了特定名字的队列(还记得hellotask_queue吗)。可以指明一个队列这一点对我们而言至关重要,因为我们也要让工作者指向同一个队列。当你在生产者和消费者之间共用一个队列时,给这个队列取个名字就非常重要。

但这不适应于我们的日志系统。我们想让每个消费者都接收到所有的日志消息,而不是其中的一部分日志消息。我们关心的是当前广播的消息。为了解决这些问题,我们需要做两件事情。

首先,无论何时我们连接到RabbitMQ服务的时候,我们都需要一个新鲜的空的队列。为了达到这个效果,我们可以为队列取一个随机的名字,或者更好的是,让RabbitMQ服务器为我们的队列随机起个名字。

其次,当我们关闭了消费者的时候,队列应该自动删除。

当我们调用无参的queueDeclare()的时候,意味着创建了一个非持久、独享的、自动删除的队列,并返回一个自动生成的名字:

String queueName = channel.queueDeclare().getQueue();

这样就可以获取随机的队列名字了,这个名字看起来形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg


绑定:

我们已经创建了一个fanout类型的Exchange和一个队列。现在我们需要告诉Exchange发送消息到我们的队列中。Exchange和队列之间的关系称为绑定。

channel.queueBind(queueName, "logs", "");

这样,我们创建的队列就和我们创建的logs路由器建立了关系,路由器就会将消息发送到这个队列中。

可以通过下面的命令查看所有已经存在的绑定关系:

rabbitmqctl list_bindings

整合到一起:

对生产者程序,它输出日志消息,与之前的教程并没与很大不同。最重要的改变就是,我们将消息发布给logs路由器,而不是无名的路由的。当发消息的时候,我们需要提供一个路由键routingKey,但是它的值会被fanout类型的路由器忽略,以下是生产者EmitLog.java的完整代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由以及路由的类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "msg...";//发布消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");//关闭连接和通道channel.close();connection.close();}
}

可以看到,在建立了连接之后,我们声明了路由器Exchange。这一步是必须的,因为不允许将消息发给一个不存在的路由器。

如果路由器还没有绑定队列,这些发送给路由器的消息将会丢失。但这对我们无所谓,如果还没有消费者监听,我们可以安全地丢弃这些消息。

消费者ReceiveLogs.java的完整代码如下:

import com.rabbitmq.client.*;
import java.io.IOException;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由器及类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明一个随机名字的队列String queueName = channel.queueDeclare().getQueue();//绑定队列到路由器上channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//开始监听消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);}
}

现在,可以运行程序并查看结果了。首先运行两个消费者实例ReceiveLogs.java,然后运行生产者EmitLog.java。看看两个消费者实例是不是都接收到了所有的消息。

//生产者[x] Sent 'msg...'
//消费者1[*] Waiting for messages. To exit press CTRL+C[x] Received 'msg...'
//消费者2[*] Waiting for messages. To exit press CTRL+C[x] Received 'msg...'

可以看到,当生产者发出消息后,两个消费者最终都收到了消息。(本例子中只是都将日志消息都输出到控制台,如果想让其中一个消费者把日志消息输出到文件中,请参考原文)。

为了验证我们的代码真正地将队列和路由器绑定到了一起,可以使用rabbitmqctl list_bindings命令查看绑定关系,假定我们运行了两个消费者,那么你应该可以看到如下的类似信息:

Listing bindings ...
...
logs    exchange  amq.gen-S2B2k3mSnoLNNyUppS98Vw    queue  []
logs    exchange  amq.gen-ljp5AngXol4iPW649OY7Pw    queue  []

从上面的结果可以看到,数据从logs路由器传输到两个随机名字的队列中,这正是我们想要的。

想要了解如何监听一部分消息,请看下一篇博客。


说明:

①与原文略有出入,如有疑问,请参阅原文

②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。

RabbitMQ(三):Exchange交换器--fanout相关推荐

  1. RabbitMQ(五):Exchange交换器--topic

    内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  2. RabbitMQ(四):Exchange交换器--direct

    内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  3. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  4. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

  5. rabbitmq在exchange下的两种使用模式

    上一篇,我们介绍了rabbimtmq的简单工作队列的使用方式,即生产者和消费者之间直接通过绑定相同的workqueue进行消息的发送和接收,如果业务逻辑比较简单,这样的方式也是可以用的,但在实际工作中 ...

  6. RabbitMQ的三大交换器详解

    pom文件都是相同的 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=" ...

  7. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较

    一.Direct Exchange 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue. 1.一般情况可以使用rabbitMQ自带的Exchange:&quo ...

  8. Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合

    一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...

  9. celery 、rabbitmq的exchange三种方式的实现

    exchange的使用BROKER_URL只能是rabbitmq,redis使用exchange和未使用效果一样,只能为direct 1.fanout 广播式,它不需要指定路由就会把所有发送到该Exc ...

最新文章

  1. 学术 | 据说以后在探头下面用帽子挡脸没用了:用于遮挡物检测的对称卷积神经网络——SymmNet...
  2. 博士如何高效率阅读文献?有哪些技巧可以借鉴?
  3. java设计模式0--设计模式简介
  4. GIS实用小技巧(一)-如何将RTK测量数据导入CAD中?
  5. C#进行Visio开发的事件处理
  6. 机器学习Machine learning in action实战相关资料
  7. 解决Linux连不上外国软件源或者软件源失效
  8. xp系统遭遇STOP 0X0000007B蓝屏,附解决方案
  9. Android Exif 解析
  10. 单工,半双工和和全双工通讯的概念
  11. 大数据Hive搭建部署常见报错信息原因
  12. 乘S10热销的东风,三星四机齐出再攻中国手机市场
  13. [电脑问题1]Microsoft Visual Basic运行时错误‘-2147221164’:没有注册类
  14. C++:指针:void*指针(跳跃力未定的指针)
  15. 视频无损编辑、截取工具
  16. servlet修改用户头像_Java上传文件实现更换头像
  17. PAT福尔摩斯的约会
  18. java作业斗地主实现
  19. 美国打车应用Lyft公布IPO招股书 预计3月底挂牌交易
  20. 如何用计算机装手机系统,如何用手机给电脑重装系统

热门文章

  1. 九、玩转JavaScript的数组(五)
  2. Parameter-Efficient Fine-tuning 相关工作梳理
  3. 聚焦视频文本检索:一文概览视频文本检索任务最新研究进展
  4. 48小时单GPU训练DistilBERT!这个检索模型轻松达到SOTA
  5. 重磅公开课推荐 | 如何搭建聊天机器人:技术架构剖析
  6. 路痴的单身小菡 BFS求最短路径+DFS求路径数
  7. mysql 二进制查询_MySql如何插入和查询二进制数据_MySQL
  8. SpringAOP中通过JoinPoint获取值,并且实现redis注解
  9. 使用swagger编写开发接口文档
  10. Linux——主流发行版本