RabbitMQ学习之消息可靠性及特性
转载自 https://blog.csdn.net/zhu_tianwei/article/details/53971296
下面主要从队列、消息发送、消息接收方面了解消息传递过的一些可靠性处理。
1、队列
消费者是无法订阅或者获取不存在的MessageQueue中信息。消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
声明一个队列
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
durable:声明队列持久化
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
其他选项,channel.queueDeclarePassive:例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
2、发送消息
1.发送消息设置
channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
basicProperties:通过参数实现消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN
public BasicProperties(String contentType,//消息类型如:text/plainString contentEncoding,//编码Map<String,Object> headers,Integer deliveryMode,//1:nonpersistent 2:persistentInteger priority,//优先级String correlationId,String replyTo,//反馈队列String expiration,//expiration到期时间String messageId,Date timestamp,String type,String userId,String appId,String clusterId)
mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
immediate:当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
2.事务机制
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
channel.txSelect()
...
channel.txCommit()
...
channel.txRollback()
3.Confirm机制
事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。可以mandatory配合实现消息的发送可靠性。
// confirm 异步机制 通过注册listener,实现异步ack,提高性能channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//失败重发}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//确认ok}});
//confirm 同步机制
if(channel.waitForConfirms(timeout)){ //确认ok}else{//失败从发}
3、消息接收
1.autoAck
为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者。默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认.
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
...
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
//确认消息,已经收到 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
2.公平调度
让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(prefetchCount);可以设置。
3.exclusive
和queue一样,设置了true,只有第一个启动的消费者可用。
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)
RabbitMQ学习之消息可靠性及特性相关推荐
- (转)RabbitMQ学习之消息可靠性及特性
http://blog.csdn.net/zhu_tianwei/article/details/53971296 下面主要从队列.消息发送.消息接收方面了解消息传递过的一些可靠性处理. 1.队列 ...
- RabbitMQ学习之集群消息可靠性测试
之前介绍过关于消息发送和接收的可靠性:RabbitMQ学习之消息可靠性及特性 下面主要介绍一下集群环境下,rabbitmq实例宕机的情况下,消息的可靠性.验证rabbitmq版本[3.4.1]. ...
- RabbitMQ学习笔记
目录 一.MQ 的相关概念 MQ是什么? MQ三大优势 MQ的劣势 MQ 的产品 RabbitMQ核心 JMS 安装 二.HelloWorld 三.Work Queues(轮训) 消息应答 Rabbi ...
- rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)
工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...
- 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...
- RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递
RabbitMQ如何保证消息投递的准确性? 生产端的可靠性投递: 1.保证消息成功发送 2.保证MQ节点成功接收 3.发送端收到MQ节点(Broker)确认应答 4.完善的消息补偿机制 BAT等大厂解 ...
- Rabbitmq消息保存机制应用案例分析消息可靠性保证
Rabbitmq 消息保存机制 mandatory参数和immediate参数作用 mandatory:当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,Rabbitmq ...
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...
- rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
最新文章
- Tesla Model汽车架构与FSD供应链
- xml转json和实体类的两种方式
- java web--servlet(2)
- 高通平台Tag精确寻找进阶教程
- Maven的作用总结
- Android 中单选框或复选框点击其中一个,其余取消操作
- 水晶报表图形位置_看了我用Excel做的年度报表,老板直夸好
- iOS开发拓展篇—音频处理(音乐播放器6)
- HTML section元素
- 035——VUE中表单控件处理之使用vue控制select操作文字栏目列表
- Nginx笔记(一):安装
- 用计算机弹熊出没,熊出没之熊大快跑2018辅助
- 免费的onlineserver工具livezilla
- 在线分析仪器(一)概述
- 谷歌浏览器崩溃设置崩溃_Google的广告业务开始崩溃了吗?
- 深入理解Java虚拟机——加载和存储指令
- Windows10 64位 + caffe + Matlab -- cpu版本
- python 变量大小,进程和内存信息
- HTTP协议6-HTTP内容类型
- 【Android】SDK是什么?