RabbitMQ(七):常用方法说明 与 学习小结
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发
RabbitMQ(三):Exchange交换器--fanout
RabbitMQ(四):Exchange交换器--direct
RabbitMQ(五):Exchange交换器--topic
RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统
RabbitMQ(七):常用方法说明 与 学习小结
项目代码:官方:GitHub rabbitmq-tutorials 或 我整理的:rabbitmq-tutorial-java
RabbitMQ 一般工作流程:
生产者和RabbitMQ服务器建立连接和通道,声明路由器,同时为消息设置路由键,这样,所有的消息就会以特定的路由键发给路由器,具体路由器会发送到哪个或哪几个队列,生产者在大部分场景中都不知道。(1个路由器,但不同的消息可以有不同的路由键)。
消费者和RabbitMQ服务器建立连接和通道,然后声明队列,声明路由器,然后通过设置绑定键(或叫路由键)为队列和路由器指定绑定关系,这样,消费者就可以根据绑定键的设置来接收消息。(1个路由器,多个队列,不同的消费者可以设置不同的绑定关系)。
主要方法:
- 声明队列(创建队列):可以生产者和消费者都声明,也可以消费者声明生产者不声明,也可以生产者声明而消费者不声明。最好是都声明。(生产者未声明,消费者声明这种情况如果生产者先启动,会出现消息丢失的情况,因为队列未创建)
channel.queueDeclare(String queue, //队列的名字boolean durable, //该队列是否持久化(即是否保存到磁盘中)boolean exclusive,//该队列是否为该通道独占的,即其他通道是否可以消费该队列boolean autoDelete,//该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉Map<String, Object> arguments)//其他参数
- 声明路由器(创建路由器):生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
channel.exchangeDeclare(String exchange,//路由器的名字String type,//路由器的类型:topic、direct、fanout、headerboolean durable,//是否持久化该路由器boolean autoDelete,//是否自动删除该路由器boolean internal,//是否是内部使用的,true的话客户端不能使用该路由器Map<String, Object> arguments) //其他参数
- 绑定队列和路由器:只用在消费者
channel.queueBind(String queue, //队列String exchange, //路由器String routingKey, //路由键,即绑定键Map<String, Object> arguments) //其他绑定参数
- 发布消息:只用在生产者
channel.basicPublish(String exchange, //路由器的名字,即将消息发到哪个路由器String routingKey, //路由键,即发布消息时,该消息的路由键是什么BasicProperties props, //指定消息的基本属性byte[] body)//消息体,也就是消息的内容,是字节数组
BasicProperties props
:指定消息的基本属性,如deliveryMode
为2时表示消息持久,2以外的值表示不持久化消息
//BasicProperties介绍
String corrId = "";
String replyQueueName = "";
Integer deliveryMode = 2;
String contentType = "application/json";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();
- 接收消息:只用在消费者
channel.basicConsume(String queue, //队列名字,即要从哪个队列中接收消息boolean autoAck, //是否自动确认,默认trueConsumer callback)//消费者,即谁接收消息
- 消费者中一般会有回调方法来消费消息
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //该消费者的标签Envelope envelope,//字面意思为信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//获取消息示例String message = new String(body, "UTF-8");//接下来就可以根据消息处理一些事情}};
路由器类型:
- fanout:会忽视绑定键,每个消费者都可以接受到所有的消息(前提是每个消费者都要有各自单独的队列,而不是共有同一队列)。
- direct:只有绑定键和路由键完全匹配时,才可以接受到消息。
- topic:可以设置多个关键词作为路由键,在绑定键中可以使用
*
和#
来匹配 - headers:(可以忽视它的存在)
教程一 HelloWorld
看主要代码
//生产者
channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, consumer);
这里,生产者和消费者都没有声明路由器,而是声明了同名的队列。生产者发布消息时,使用了默认的无名路由器(""
),并以队列的名字作为了路由键。消费者在消费时,由于没有声明路由器,这并不表示没有路由器的存在,消费者此时使用的是默认的路由器,即Default exchange,该路由器和所有的队列都进行绑定,并且使用队列的名字作为了路由键进行绑定。所以,生产者使用默认路由器以队列的名字作为了绑定键进行了消息发布,而消费者也使用了默认的路由器,并以队列的名字作为绑定键进行了绑定。而默认路由器是direct类型,路由键和绑定键完全匹配时,消费者才能接受到消息,所以教程1中的消费者可以接收到消息。(为了认证这一点,可以将代码①去掉,然后先运行消费者,让它等待监听,然后启动生产者,发送消息,消费者同样会收到消息。这里的生产者声明队列,只是让RabbitMQ服务器先创建这个队列,以免发送的消息因为找不到队列而丢失。)
教程二 Work Queues
看主要代码
//生产者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "1.";
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);---①...channel.basicAck(envelope.getDeliveryTag(), false);...---③
boolean autoAck = false;---②
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
这里也使用了默认的direct路由器。假如启动多个工作者(消费者),按道理这些工作者应该可以接收到所有的消息啊,但是不要忘了这几个工作者都是从同一个队列中取消息,消息取出一个,队列中就少一个,所以每个工作者都只是收到的消息的一部分。既然这几个工作者都从同一个队列中取消息,那每个工作者应该怎么取呢?
如果没有代码①,并且②设置为true,即自动确认收到消息,RabbitMQ只要发出消息就认为消费者收到了,此时RabbitMQ采取的是循环分发的策略,在这几个工作者中循环轮流分发消息。每个工作者接受到的消息数量都是相同的。
如果有代码①,并且②设置为false,则RabbitMQ会采取公平分发策略,即将消息发给空闲的工作者(空闲,工作者将消息处理完毕,执行了代码③;不空闲,即工作者还在处理消息,还没有给RabbitMQ发回确认信息,即还没有执行代码③)。
代码①中的参数1:(prefetchCount)maximum number of messages that the server will deliver
。
为了防止队列丢失,在声明队列的时候指定了durable
为true
。为了防止消息丢失,设置了消息属性BasicProperties
为MessageProperties.PERSISTENT_TEXT_PLAIN
,让我们看看值是什么:
可以看出里面包含了deliveryMode=2
。从这张图也可以看到BasicProperties
属性的全貌。
如果想让多个消费者共同消费某些消息,只要让他们共用同一队列即可(当然前提是你得保证消息可以都进到这个队列中来,如本例中使用direct
路由器,消息的路由键和队列的绑定键设为一致,当然也可以使用fanout
路由器,路由键和绑定键随意设置,不一致也能收到,因为fanout
路由器会忽略路由键的设置)。
教程三 Publish/Subscribe
看主要代码:
//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();---①
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, consumer);
教程三才引出路由器的概念。生产者和消费者声明了同样的路由,并指明路由类型为fanout
,该路由器会忽视路由键,将消息发布到所有绑定的队列中(仍需要绑定,只是绑定时绑定键任意就行了)。
假如启动多个消费者,因为代码①中调用无参的声明队列方法channel.queueDeclare()
,就会创建了一个非持久、独占的、自动删除的队列,并返回一个自动生成的名字。所以多个消费者取消息时使用的是各自的队列,不会存在多个消费者从同一个队列取消息的情况。这样多个消费者就可以接收到同一消息。
如果想实现多个消费者都可以接收到所有的消息,只要让他们各自使用单独的队列即可(当然前提是保证路由键和绑定键的设置可以让所有消息都进入到自己的队列,如本例中使用fanout
路由器,无需考虑绑定键和路由键)。
教程4 Routing
看主要代码:
//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
channel.basicConsume(queueName, true, consumer);
可以看出,教程4使用了direct
路由器,该路由器的特点是可以设定路由键和绑定键,消费者只能从队列中取出两者匹配的消息。
在生产者发消息时,为消息设置不同的路由键(如例子中severity
可以设为info
、warn
、error
)。
消费者在通过为队列设置多个绑定关系,来选择想要接收的消息。
这里有一个概念叫做多重绑定,即多个队列以相同的绑定键binding key绑定到同一个路由器上,此时direct
路由器就会像fanout路由器一样,将消息广播给所有符合路由规则的队列。
教程5 Topics
看主要代码:
//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "msg...";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}
channel.basicConsume(queueName, true, consumer);
这里使用了topic
路由器,它与direct
路由器类似,不同在于,topic
路由器可以为路由键设置多重标准。一个消息有一个路由键,direct
路由器只能为路由键指定一个关键字,但是topic
路由器可以在路由键中通过点号分割多个单词来组成路由键,消费者在绑定的时候,可以设置多重标准来选择接受。
举个例子:假如日志根据严重级别info
、warn
、error
,也可以根据来源分为cron
、kern
、auth
。某个日志消息设置路由键为kern.info
,表示来自kern
的info
级别的日志。想要选择接收消息的时候,direct
路由器就办不到,它要么可以根据严重级别来筛选,要么根据来源来筛选,而topic
路由器则可以轻松应对,只要将绑定键设置为kern.info
就可以精准获取该类型的日志。
教程6 Remote Procedure Call
教程6属于RabbitMQ在RPC领域的应用,与上面几个教程的内容没有多大衔接,可以直接阅读原文。
博客转载自:https://www.jianshu.com/p/a6460b4b155f
RabbitMQ(七):常用方法说明 与 学习小结相关推荐
- JavaSE学习小结二
JavaSE学习小结二 记录学习----------JavaSE模块三&四 其中大部分是一些工具类,会将常用方法及其功能描述整理成表格方便日后查阅 文章目录 JavaSE学习小结二 一.Obj ...
- 图片裁剪功能学习小结
图片裁剪功能学习小结 近期有需要使用图片裁剪的功能,在使用插件和自己写裁剪组件之间犹豫了很久,后来根据需求经过反复的考虑,还是自己封装吧,毕竟自己动手,丰衣足食,对吧?嗯,??????是的!最后生成裁 ...
- Unity API常用方法和类学习笔记2
Unity API常用方法和类学习笔记2 ------Mathf & Input & Vector & Random 类Mathf 一.静态变量 print(Mathf.Deg ...
- 机器学习/深度学习/图机器学习 学习小结
目录 前言 学习小结 一.机器学习部分 二.深度学习部分 三.图神经网络 概念清晰!概念清晰!概念清晰! 磨刀不误砍柴工! 最难的是开始,最简单的也是开始! 前言 历时将近一年吧,因为疫情原因在家,终 ...
- 深度学习原理学习小结 - Self-Attention/Transformer
文章目录 深度学习原理学习小结 - Self-Attention/Transformer Self-Attention基本原理 引入 核心概念 计算方法 Transformer基本原理 知识补充 编码 ...
- 数据科学导论学习小结——其三
数据科学导论学习小结--其三 这是笔者大学二年级必修科目<数据科学基础>个人向笔记整理的第三部分,包含第六.第七两个章节.本笔记内容基于清华大学出版社<数据科学导论-探索数据的奥秘& ...
- 无限风光 : 近来地形算法学习小结【转】
无限风光 : 近来地形算法学习小结 原文链接 目录 -写在前面 -本文话题整体观 -概念(Concepts): 入门须知 -高度图(HeightMap) -分形(Fractal ...
- Python - 输出格式 (学习小结)
Python - 输出格式 (学习小结) Bu.xing 利用现代手段,创建学习家园 关注他 1 人赞同了该文章 Python 输出格式 我们常说的输出格式分两种含义: # 一种是指数据在屏幕上的显 ...
- Page 的生命周期学习小结
(以前我在 csdn 写的翻译文章,现在转到这里来.) Page 的生命周期学习小结 原文链接:Page Events: Order and PostBack 作者:Paul Wilson 翻译:木野 ...
最新文章
- Java内存模型与线程
- 借助acs来实现telnet、ssh的远程认证
- 我如何查看要使用git推送的内容?
- php和python性能-python、node、php、go、java性能对比测试
- plsql动态的sql
- vs与qt版本对应关系
- 用subline添加单引号和逗号,在sql中使用
- openssh arm linux 编译,openssh编译安装到ARM嵌入式系统中
- python四舍五入保留小数点后三位_Python中的“正确”四舍五入到小数点后3位
- linux系统创建lvm卷,Linux逻辑卷LVM实现
- DevExpress XtraGrid RepositoryItemCheckEdit 复选框多选的解决方法(转)
- 在Objective-C中分类对象和方法
- Jquery常用操作select篇
- 中秋福利!开源基础设施峰会9折票!另有限量免费门票!
- jquery实现html表格隔行变色
- C# 使用 windowsmedia循环播放视频
- word加了脚注,分节符(连续)后的内容,跳到下一页
- Linux-京东字节百度提前批,一面二面都被问到了awk——实例篇(4)ip地址相关
- 苹果可穿戴设备项目背后的那些专家
- 塑料回收标志相关知识
热门文章
- 【小白学习C++ 教程】八、在C++指针传递引用和Const关键字
- 三、ResNet50预置算法提高美食分类识别精确度
- Java 100(三)
- 二十八、Pyspider 爬取链家网
- 直播预告 | AAAI 2022论文解读:基于对比学习的预训练语言模型剪枝压缩
- 今日arXiv精选 | 35篇顶会论文:ICCV/ CIKM/ ACM MM
- 仅输入单张图片,就能“看”出物体材质!这篇图形学论文已被SIGGRAPH 2021收录...
- 直播报名 | 官方解读NVIDIA黑科技:StyleGAN的架构与实现
- 今晚8点:基于强化学习的关系抽取和文本分类 | PhD Talk #18
- ResNet学习笔记