消息的确认,是指生产者投递消息后,如果 Broker(消息实体) 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!

单个确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirms()这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。

批量确认

单个确认的方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样会阻塞消息的发布。

异步确认

Channel对象提供的ConfirmListener()回调方法只包含deliverTag(当前Channel发出的消息序列号),需要自己为每一个Channel维护一个cunconfirm的消息序列号集合,每个publish数据,集合中元素+1,回调一次handleAck方法,unconfirm集合删除相应的一条(multiple=false)或多条(multiple=true)记录。从程序效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

public class Test {private static final long MESSAGE_COUNT=1000;public static void main(String[] args) throws Exception{//单个确认
//        Test.danGeQueRen(); //发布1000个单独确认消息,耗时991ms//批量确认Test.piLiangQueRen(); //发布1000个批量确认消息,耗时139ms//异步确认
//        Test.yiBuQueRen(); //发布1000个异步确认消息,耗时56ms}//    单个确认static void danGeQueRen() throws Exception {Channel channel = Rabbitmqutil.getChannel();//队列的声明 使用uuid生成随机队列名String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量发消息for (int i = 1; i < MESSAGE_COUNT ; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//单个发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin)+"ms");}//批量确认static void piLiangQueRen() throws Exception{Channel channel = Rabbitmqutil.getChannel();//队列的声明 使用uuid生成随机队列名String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量发消息//批量确认消息大小int batchSize = 100;for (int i = 1; i < MESSAGE_COUNT ; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断达到100条消息的时候 批量确认一次if(i%batchSize==0){//发布确认channel.waitForConfirms();}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin)+"ms");}//异步确认static void yiBuQueRen() throws Exception{Channel channel = Rabbitmqutil.getChannel();//队列的声明 使用uuid生成随机队列名String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();/** 线程安全有序的一个哈希表 适用于高并发的情况* 1 轻松的将序号与消息进行关联* 2 轻松批量删除条目 只要给到序号* 3 支持高并发(多线程)* */ConcurrentSkipListMap<Long,String> outstandingConfirms=new ConcurrentSkipListMap<>();//消息确认成功 回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{if(multiple){//2 删除掉已经确认的消息 剩下的就是未确认的消息ConcurrentNavigableMap<Long,String> confirmed=outstandingConfirms.headMap(deliveryTag);}else{outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息"+deliveryTag);};/** 1 消息的标记* 2 是否为批量确认* *///消息确认失败 回调函数ConfirmCallback nackCallback = (deliveryTag,multiple)->{//3 打印下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是"+message+"-->未确认消息的tag:"+deliveryTag);};//准备消息的监听器 监听哪些消息成功了 哪些消息失败了/** 1 监听成功的消息* 2 监听失败的消息* */channel.addConfirmListener(ackCallback,nackCallback);//开始时间long begin = System.currentTimeMillis();for (int i = 1; i < MESSAGE_COUNT ; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());// 1 记录下所有要发送的消息 消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+(end-begin)+"ms");}

在main函数中执行单个确认方法

执行批量确认

执行异步确认

Rabbitmq消息发布确认机制相关推荐

  1. rabbitmq消息ACK确认机制及发送失败处理

    rabbitmq为确保消息发送和接收成功,采用ack机制. (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功: (2)消费者consumer接收 ...

  2. RabbitMQ消息confirm确认机制

  3. springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...

  4. RabbitMq 消息接收确认(可靠消费)

    RabbitMq 消息接收确认(可靠消费) 一.消息接收确认是什么: 是RabbitMq确认消息是否成功被消费的一种机制. 有三种消息确认方式: 1.none代表不确认:该模式下,只要队列获取到了消息 ...

  5. RabbitMq 消息发送确认(可靠生产和推送确认)

    RabbitMq 消息发送确认(可靠生产和推送确认) 此文档只是本人在项目中碰到的一些问题而产生的个人相关总结,实际上的消息确认机制可以做得更多(比如分布式事务等,但此处不做阐述). 一.消息发送确认 ...

  6. 三、RabbitMQ消息发布时的权衡

    目录标题 RabbitMQ消息发布时的权衡 失败通知 消息发布时的权衡: 加入事务 发送方确认 备用交换器 总结 RabbitMQ消息发布时的权衡 失败通知 mandatory 消息发布时的权衡: 加 ...

  7. SpringACK对RabbitMQ消息的确认(消费)

    SpringAMQP对RabbitMQ消息的确认(消费) 之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列. 本次的介绍 ...

  8. rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制

    参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...

  9. RabbitMQ消息应答------ack机制

    Message acknowledgment(消息应答) 执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了.一旦RabbitMQ将消息分发给了消费者,就会从内存中删除. ...

  10. RabbitMQ异步发布确认

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是 ...

最新文章

  1. Vue之Element-ui和Vue-cli的使用
  2. 表贴光电池 FU-NJL6402R-2 的特性
  3. dhcp MySQL 超时_mysql导入sql文件过大或连接超时的解决的方法
  4. vue 使用fs_node.js中常用的fs文件系统
  5. JQuery中serialize()、serializeArray()和param()的使用方法
  6. Abp v0.18.0 新版本: MVC Module 启动模板
  7. PWM实现语音播放原理
  8. html css模仿实例,HTML+CSS模仿大学网站主页
  9. C#重载操作符==和!=时注意问题
  10. lua中 只有 nil 和 false 为假, 其他都为真包括0
  11. 华为服务器安装操作系统
  12. Biopython---part 1
  13. 主板风扇转不开机是什么问题,电脑开机没反应_电脑开机风扇转但无法启动主板...
  14. CentOS 安装字体
  15. 揭秘传智播客班级毕业薪资超7k的内幕系列 之三 ----国企慕名而来,将未毕业学员“抢走”,传智播客又一次定义“被就业”...
  16. 新疆计算机一级考试excel公式,2020年XX专业技术人员继续教育公需课《Excel快速统计》试题及答案...
  17. linux控制NVme硬盘点灯,硬盘点灯模式的设置方法及装置与流程
  18. 为什么要创业,有人盆满钵满、有人别无选择,区别在创业思维
  19. uvm 糖果爱好者 subscriber调用parent方法解读
  20. 【学习】大数据关键技术

热门文章

  1. 逻辑覆盖:语句覆盖、判定覆盖、条件覆盖、判定/条件覆盖、组合覆盖和路径覆盖
  2. 时间管理——帕累托法则(二八定律)
  3. 工资管理系统源码下载
  4. 信息安全实训——神奇的木马
  5. Arduino时钟LCD显示
  6. 【base】串行口RS232的接口定义
  7. 基于遗传算法的柔性流水车间调度设计与实现(论文+源码+答辩ppt)
  8. 刘士颉老师——德鲁克“卓有成效”管理理论的践行者,曾任宜信公司培训负责人
  9. 【JavaScript设计模式】(一)
  10. 第八章 软件维护(1)