【译】RabbitMQ教程一

  • 主要通过Hello Word对RabbitMQ有初步认识

【译】RabbitMQ教程二

  • 工作队列,即一个生产者对多个消费者
  • 循环分发、消息确认、消息持久、公平分发

【译】RabbitMQ教程三

  • 如何同一个消息同时发给多个消费者
  • 开始引入RabbitMQ消息模型中的重要概念路由器Exchange以及绑定等
  • 使用了fanout类型的路由器

【译】RabbitMQ教程四

  • 如何选择性地接收消息
  • 使用了direct路由器

【译】RabbitMQ教程五

  • 如何通过多重标准接收消息
  • 使用了topic路由器,可通过灵活的路由键和绑定键的设置,
    进一步增强消息选择的灵活性

【译】RabbitMQ教程六

  • 如何使用RabbitMQ实现一个简单的RPC系统
  • 回调队列callback queue和关联标识correlation id

各教程代码

  • 官方:GitHub rabbitmq-tutorials
  • 我整理的:rabbitmq-tutorial-java

RabbitMQ 一般工作流程

生产者和RabbitMQ服务器建立连接和通道,声明路由器,同时为消息设置路由键,这样,所有的消息就会以特定的路由键发给路由器,具体路由器会发送到哪个或哪几个队列,生产者在大部分场景中都不知道。(1个路由器,但不同的消息可以有不同的路由键)。
消费者和RabbitMQ服务器建立连接和通道,然后声明队列,声明路由器,然后通过设置绑定键(或叫路由键)为队列和路由器指定绑定关系,这样,消费者就可以根据绑定键的设置来接收消息。(1个路由器,1个队列,但不同的消费者可以设置不同的绑定关系)。


主要方法

  • 声明队列(创建队列):可以生产者和消费者都声明,也可以消费者声明生产者不声明,也可以生产者声明而消费者不声明。最好是都声明。(生产者未声明,消费者声明这种情况如果生产者先启动,会出现消息丢失的情况,因为队列未创建)

    channel.queueDeclare(String queue, //队列的名字boolean durable, //该队列是否持久化(即是否保存到磁盘中)boolean exclusive,//该队列是否为该通道独占的,即其他通道是否可以消费该队列boolean autoDelete,//该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉Map<String, Object> arguments)//其他参数
  • 声明路由器(创建路由器):生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
    channel.exchangeDeclare(String exchange,//路由器的名字String type,//路由器的类型:topic、direct、fanout、headerboolean durable,//是否持久化该路由器boolean autoDelete,//是否自动删除该路由器boolean internal,//是否是内部使用的,true的话客户端不能使用该路由器Map<String, Object> arguments) //其他参数
  • 绑定队列和路由器:只用在消费者

    channel.queueBind(String queue, //队列String exchange, //路由器String routingKey, //路由键,即绑定键Map<String, Object> arguments) //其他绑定参数
  • 发布消息:只用在生产者

    channel.basicPublish(String exchange, //路由器的名字,即将消息发到哪个路由器String routingKey, //路由键,即发布消息时,该消息的路由键是什么BasicProperties props, //指定消息的基本属性byte[] body)//消息体,也就是消息的内容,是字节数组
    • BasicProperties props:指定消息的基本属性,如deliveryMode为2时表示消息持久,2以外的值表示不持久化消息

      //BasicProperties介绍
      String corrId = "";
      String replyQueueName = "";
      Integer deliveryMode = 2;
      String contentType = "application/json";
      AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();
  • 接收消息:只用在消费者
    channel.basicConsume(String queue, //队列名字,即要从哪个队列中接收消息boolean autoAck, //是否自动确认,默认trueConsumer callback)//消费者,即谁接收消息
    • 消费者中一般会有回调方法来消费消息

      Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //该消费者的标签Envelope envelope,//字面意思为信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//获取消息示例String message = new String(body, "UTF-8");//接下来就可以根据消息处理一些事情}};

路由器类型

  • fanout:会忽视绑定键,每个消费者都可以接受到所有的消息(前提是每个消费者都要有各自单独的队列,而不是共有同一队列)。
  • direct:只有绑定键和路由键完全匹配时,才可以接受到消息。
  • topic:可以设置多个关键词作为路由键,在绑定键中可以使用*#来匹配
  • headers:(可以忽视它的存在)

教程一 HelloWorld

看主要代码

//生产者
channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, consumer);

这里,生产者和消费者都没有声明路由器,而是声明了同名的队列。生产者发布消息时,使用了默认的无名路由器(""),并以队列的名字作为了路由键。消费者在消费时,由于没有声明路由器,这并不表示没有路由器的存在,消费者此时使用的是默认的路由器,即Default exchange,该路由器和所有的队列都进行绑定,并且使用队列的名字作为了路由键进行绑定。所以,生产者使用默认路由器以队列的名字作为了绑定键进行了消息发布,而消费者也使用了默认的路由器,并以队列的名字作为绑定键进行了绑定。而默认路由器是direct类型,路由键和绑定键完全匹配时,消费者才能接受到消息,所以教程1中的消费者可以接收到消息。(为了认证这一点,可以将代码①去掉,然后先运行消费者,让它等待监听,然后启动生产者,发送消息,消费者同样会收到消息。这里的生产者声明队列,只是让RabbitMQ服务器先创建这个队列,以免发送的消息因为找不到队列而丢失。)


教程二 Work Queues

看主要代码

//生产者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "1.";
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);---①...channel.basicAck(envelope.getDeliveryTag(), false);...---③
boolean autoAck = false;---②
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

这里也使用了默认的direct路由器。假如启动多个工作者(消费者),按道理这些工作者应该可以接收到所有的消息啊,但是不要忘了这几个工作者都是从同一个队列中取消息,消息取出一个,队列中就少一个,所以每个工作者都只是收到的消息的一部分。既然这几个工作者都从同一个队列中取消息,那每个工作者应该怎么取呢?

如果没有代码①,并且②设置为true,即自动确认收到消息,RabbitMQ只要发出消息就认为消费者收到了,此时RabbitMQ采取的是循环分发的策略,在这几个工作者中循环轮流分发消息。每个工作者接受到的消息数量都是相同的。
如果有代码①,并且②设置为false,则RabbitMQ会采取公平分发策略,即将消息发给空闲的工作者(空闲,工作者将消息处理完毕,执行了代码③;不空闲,即工作者还在处理消息,还没有给RabbitMQ发回确认信息,即还没有执行代码③)。
代码①中的参数1:(prefetchCount)maximum number of messages that the server will deliver

为了防止队列丢失,在声明队列的时候指定了durabletrue。为了防止消息丢失,设置了消息属性BasicPropertiesMessageProperties.PERSISTENT_TEXT_PLAIN,让我们看看值是什么:

MessageProperties.PERSISTENT_TEXT_PLAIN

可以看出里面包含了deliveryMode=2。从这张图也可以看到BasicProperties属性的全貌。

如果想让多个消费者共同消费某些消息,只要让他们共用同一队列即可(当然前提是你得保证消息可以都进到这个队列中来,如本例中使用direct路由器,消息的路由键和队列的绑定键设为一致,当然也可以使用fanout路由器,路由键和绑定键随意设置,不一致也能收到,因为fanout路由器会忽略路由键的设置)。


教程三 Publish/Subscribe

看主要代码

 //生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();---①
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, consumer);

教程三才引出路由器的概念。生产者和消费者声明了同样的路由,并指明路由类型为fanout,该路由器会忽视路由键,将消息发布到所有绑定的队列中(仍需要绑定,只是绑定时绑定键任意就行了)。
假如启动多个消费者,因为代码①中调用无参的声明去恶劣方法channel.queueDeclare(),就会创建了一个非持久、独特的、自动删除的队列,并返回一个自动生成的名字。所以多个消费者取消息时使用的是各自的队列,不会存在多个消费者从同一个队列取消息的情况。
这样多个消费者就可以接收到同一消息。

如果想实现多个消费者都可以接收到所有的消息,只要让他们各自使用单独的队列即可(当然前提是保证路由键和绑定键的设置可以让消息都进入到队列,如本例中使用fanout路由器,无需考虑绑定键和路由键)。


教程4 Routing

看主要代码:

//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
channel.basicConsume(queueName, true, consumer);

可以看出,教程3使用了direct路由器,该路由器的特点是可以设定路由键和绑定键,消费者只能从队列中取出两者匹配的消息。
在生产者发消息时,为消息设置不同的路由键(如例子中severity可以设为infowarnerror)。
消费者在通过为队列设置多个绑定关系,来选择想要接收的消息。
这里有一个概念叫做多重绑定,即多个队列以相同的绑定键binding key绑定到同一个路由器上,此时direct路由器就会像fanout路由器一样,将消息广播给所有符合路由规则的队列。


教程5 Topics

看主要代码:

//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "msg...";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME,   bindingKey);}
channel.basicConsume(queueName, true, consumer);

这里使用了topic路由器,它与direct路由器类似,不同在于,topic路由器可以为路由键设置多重标准。一个消息有一个路由键,direct路由器只能为路由键指定一个关键字,但是topic路由器可以在路由键中通过点号分割多个单词来组成路由键,消费者在绑定的时候,可以设置多重标准来选择接受。
举个例子:假如日志根据严重级别infowarnerror,也可以根据来源分为cronkernauth。某个日志消息设置路由键为kern.info,表示来自kerninfo级别的日志。想要选择接收消息的时候,direct路由器就办不到,它要么可以根据严重级别来筛选,要么根据来源来筛选,而topic路由器则可以轻松应对,只要将绑定键设置为kern.info就可以精准获取该类型的日志。

RabbitMQ教程总结相关推荐

  1. [译]RabbitMQ教程C#版 - 远程过程调用(RPC)

    先决条件 本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难, ...

  2. RabbitMQ教程C#版 - 工作队列

    先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...

  3. Spring RabbitMQ教程

    Spring RabbitMQ教程  Spring RabbitMQ是基于Spring AMQP协议实现的消息代理. 目录[ 隐藏 ] 1 Spring RabbitMQ 1.1 Spring AMQ ...

  4. RabbitMQ教程远程过程调用RPC

    前言:在前面的教程里我们学习了工作队列,实现了将工作任务发给不同的工人,如果任务是需要在另一台计算机上运行,我们如何实现运行远程计算机上的一个函数任务并等待其返回的结果呢,这种模式通常被称为远程过程调 ...

  5. RabbitMQ教程 3.发布/订阅(Publish/Subscribe)

    搜索:Java课代表,关注公众号,及时获取更多Java干货. 3 发布/订阅(Publish/Subscribe) 在上一节中,我们创建了一个工作队列.其目的是将每个任务只分发给一个worker.本节 ...

  6. RabbitMQ教程完整(精华干货)

    RabbitMQ教程(完整!!!) 一.RabbitMQ安装 1.1 为什么使用RabbitMQ 1.降低耦合度 2.RabbitMQ速度快,微秒级别 3.学习成本低 4.支持多种语言 1.2 什么是 ...

  7. RabbitMQ 教程

    RabbitMQ 是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能.健壮 ...

  8. yum安装RabbitMQ教程

    yum安装RabbitMQ教程 1.配置 epel ,rabbitmq的下载源 yum install epel-release -y 2.下载rabbitmq yum install rabbitm ...

  9. 【详细】【转】C#中理解委托和事件 事件的本质其实就是委托 RabbitMQ英汉互翼(一),RabbitMQ, RabbitMQ教程, RabbitMQ入门...

    [详细][转]C#中理解委托和事件 文章是很基础,但很实用,看了这篇文章,让我一下回到了2016年刚刚学委托的时候,故转之! 1.委托 委托类似于C++中的函数指针(一个指向内存位置的指针).委托是C ...

最新文章

  1. 题目1551:切蛋糕
  2. Linux上iptables防火墙的基本应用教程
  3. 【自动驾驶】视觉里程计
  4. x265-确定slice type-3
  5. 西门子rwd68温控器说明书_西门子RWD68说明书
  6. mysql配置日志老化配置_mysql配置-日志大小限制和自动删除
  7. 在gluster中配置distributed 卷
  8. 各种字符串合并处理示例.sql
  9. 进击的PM:作为产品总监,你需要具备什么样的能力?
  10. POJ 2492 A Bug's Life (带权并查集 向量偏移)
  11. maven工程启动时报“Cannot resolve XXX:XXX:xx.xx.xx”错误的问题
  12. 【Ian Goodfellow 强推】GAN 进展跟踪 10 大论文(附下载)
  13. C# Windows服务自动安装与注册
  14. C++--第13课 - 操作符重载 - 下
  15. CAM350 V14.6 检查gerber文件
  16. proteus 安装包以及破解汉化
  17. 基于sa866的电磁搅拌器计算机控制系统,一种新型电磁搅拌器计算机控制系统
  18. java怎么实现事务_java实现简单的事务
  19. 如何用文本文档编写python程序
  20. 每天读点故事产品体验报告

热门文章

  1. 单调栈 leetcode整理(二)
  2. 初中文化能学编程吗_网页编程课程来了,确定不来pick一下!!!|科创辅学进行时...
  3. 474. 一和零 golang动态规划
  4. c语言中 %.2s,C2S是什么意思
  5. mysql 远程load data,PyMySQL将(文件)数据加载到远程MySQL实例时发生错误/异常
  6. Redis常见问题及其一些重点知识总结
  7. 类中的静态成员函数访问非静态成员变量
  8. C语言中的深拷贝和浅拷贝
  9. 关于NOR FLASH地址左右移的问题
  10. Makefile文件试错