2019独角兽企业重金招聘Python工程师标准>>>

默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器 ,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
1、通过事务机制实现。
2、通过发送方确认 publisher confirm 机制实现。

1、事务机制

开启事务后,客户端和RabbitMQ之间的通讯交互流程:

  • 客户端发送给服务器Tx.Select(开启事务模式)
  • 服务器端返回Tx.Select-Ok(开启事务模式ok)
  • 推送消息
  • 客户端发送给事务提交Tx.Commit
  • 服务器端返回Tx.Commit-Ok

以上就完成了事务的交互流程,如果其中任意一个环节出现问题,就会抛出IoException移除,这样用户就可以拦截异常进行事务回滚,或决定要不要重复消息。

声明交换机、队列并绑定

    /*** 1、声明交换机、队列并绑定*/@org.junit.Testpublic void decalreExchange() throws Exception {String exchange = "hello_tx";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,new HashMap<>());String queueName = "hello_tx_c";// 声明队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机String routingKey = "aaa";channel.queueBind(queueName, exchange, routingKey,null);}

发送消息:这里通过 1/0 来产生异常

    /*** 生产者发送消息* @throws Exception*/@org.junit.Testpublic void sendMessage() throws Exception {String exchange = "hello_tx";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 消息内容try {channel.txSelect();//开启事务String message1 = "Less is more tx ";// 发布消息到Exchange 指定路由键channel.basicPublish(exchange, "aaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message1.getBytes());int i = 1/0;channel.confirmSelect();//提交事务}catch (Exception e) {log.error("error:",e);channel.txRollback();//回滚}channel.close();connection.close();}

可以看到,发送方出现异常,消息并没有发送到rabbitmq的队列里。

那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低2~10倍的性能。

2、发送方确认模式

基于事务的性能问题,RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。 

原理:生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),由这个id在生产者和RabbitMQ之间进行消息的确认。

confirm模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息决定下一步的处理。

如何使用

首先声明交换机、队列并绑定

    /*** 1、声明交换机、队列并绑定*/@org.junit.Testpublic void decalreExchange() throws Exception {String exchange = "hello_confirm";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());String queueName = "hello_confirm_c";// 声明队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机String routingKey = "aaa";channel.queueBind(queueName, exchange, routingKey, null);}

通过下面的代码,进行测试

  /*** 确认发送1条消息** @throws Exception*/@org.junit.Testpublic void sendMessage1() throws Exception {String exchange = "hello_confirm";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.confirmSelect();// 消息内容String message1 = "Less is more confirm ";// 发布消息到Exchange 指定路由键channel.basicPublish(exchange, "aaa", null, message1.getBytes());if (channel.waitForConfirms()){//等待回复log.debug("发送成功");}else{log.debug("发送失败");}channel.close();connection.close();}/*** 批量确认发送消息** @throws Exception*/@org.junit.Testpublic void sendMessage2() throws Exception {String exchange = "hello_confirm";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.confirmSelect();for (int i= 1;i <= 3; i++){// 发布消息到Exchange 指定路由键// 消息内容String message = "Less is more confirm " + i;channel.basicPublish(exchange, "aaa", null, message.getBytes());}if (channel.waitForConfirms()){//批量确认log.debug("发送成功");}else{log.debug("发送失败");}channel.close();connection.close();}/*** 添加确认监听器** @throws Exception*/@org.junit.Testpublic void sendMessage3() throws Exception {String exchange = "hello_confirm";// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple)throws IOException {log.debug("deliveryTag:{},multiple:{}",deliveryTag,multiple);}public void handleNack(long deliveryTag, boolean multiple)throws IOException {}});// 发布消息到Exchange 指定路由键for (int i= 1;i <= 3; i++){// 发布消息到Exchange 指定路由键// 消息内容String message = "Less is more confirm " + i;channel.basicPublish(exchange, "aaa", null, message.getBytes());}if (channel.waitForConfirms()){//批量确认log.debug("发送成功");}else{log.debug("发送失败");}channel.close();connection.close();}

详细源码地址

https://github.com/suzhe2018/rabbitmq-item

转载于:https://my.oschina.net/suzheworld/blog/3003370

Rabbitmq消息发送事务与确认机制相关推荐

  1. 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订

    最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制. 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订阅队列 3 ...

  2. RabbitMq 消息发送确认(可靠生产和推送确认)

    RabbitMq 消息发送确认(可靠生产和推送确认) 此文档只是本人在项目中碰到的一些问题而产生的个人相关总结,实际上的消息确认机制可以做得更多(比如分布式事务等,但此处不做阐述). 一.消息发送确认 ...

  3. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  4. rabbitmq可靠发送的自动重试机制

    rabbitmq可靠发送的自动重试机制 转载地址:http://www.jianshu.com/p/6579e48d18ae http://www.jianshu.com/p/4112d78a8753 ...

  5. RabbitMQ消息发送和接收

    1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...

  6. (转)rabbitmq可靠发送的自动重试机制

    转自:https://www.jianshu.com/p/6579e48d18ae rabbitTemplate的发送流程是这样的: 1 发送数据并返回(不确认rabbitmq服务器已成功接收) 2 ...

  7. 消息消费端的确认机制

    RocketMQ提供了ack机制,以保证消息能够被正常消费.发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功.中途断电,抛出异常等都不会认为成功 con ...

  8. rabbitmq消息发送与接收stomp通道测试

    新人学习笔记,有错欢迎交流指出~~ 发现问题: 安装好Erlang.rabbitmq客户端后,启用插件rabbitmq-plugins enable rabbitmq_web_stomp后无法访问ht ...

  9. rabbitmq可靠发送的自动重试机制 --转

    原贴地址   https://www.jianshu.com/p/6579e48d18ae https://www.jianshu.com/p/4112d78a8753 git项目代码地址 https ...

最新文章

  1. Linux中如何用命令打开文件夹
  2. linux swftools java_linux安装openoffice与SWFtools工具
  3. 【R语言】迫松分布估计--判断是否符合迫松分布
  4. AJAX的异步请求小例子
  5. 清远工贸职业技术学校清远大学城网
  6. IIS添加直接下载的文件类型
  7. How to proof Pi
  8. 设计大师Donald Norman和Bill Buxton签书会在南京举行
  9. java的字符串的加密_Java加密解密字符串
  10. CSS Margin 边距详解、CSS 实现水平垂直居中、overflow 兼容 Safari 浏览器
  11. 淘宝用户行为分析(四):行为聚类
  12. selenium tips
  13. 黑灰产眼中的NFT:平台嗷嗷待宰,用户送钱上门
  14. python counter怎么用_Counter的基本用法
  15. 有感:一名大学毕业生的反思:轰动中国万言帖 最露骨大学生活
  16. macbookair有没有touchbar_高配MacBook Air和低配MacBook Pro选哪个?
  17. 华为p50鸿蒙系统5G5G,华为P50最新确认:2K屏+鸿蒙系统+5220mAh,这才是华为的实力...
  18. 拼多多下单助手怎么一键采购、发货的?
  19. Python之简单飞机行李托运计费系统
  20. incaformat蠕虫病毒样本分析及查杀防范措施

热门文章

  1. 中国广电设备行业十四五运营模式与投资机遇研究报告2022版
  2. openssl 代码分析(1)
  3. mysql 中datetime_MySQL中Datetime与Timestamp
  4. 对话尹成杰三农谋定压舱石-农业大健康·万祥军:稳农保供
  5. LuoguP3959 宝藏 题解
  6. DBUtils (30)
  7. 【学习笔记】Sass入门指南
  8. 微信网页授权功能来获取用户信息(昵称或头像)之php实现
  9. Visual Studio Code搭建TypeScript开发环境
  10. android中使用jni对字符串加解密实现分析