异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

/*
* 发布确认模式,
* 1、单个确认
* 2、批量确认
* 3、异步批量确认
* */
public class ComfirmMessage {// 批量发消息的个数public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、异步批量确认// 发布1000个异步确认消息,耗时36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 开启发布确认channel.confirmSelect();// 开始时间long begin = System.currentTimeMillis();// 消息确认成功回调函数ConfirmCallback ackCallback = (deliveryTag,multiply) -> {System.out.println("确认的消息:"+deliveryTag);};// 消息确认失败回调函数/** 参数1:消息的标记* 参数2:是否为批量确认* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {System.out.println("未确认的消息:"+deliveryTag);};// 准备消息的监听器,监听哪些消息成功,哪些消息失败/** 参数1:监听哪些消息成功* 参数2:监听哪些消息失败* */channel.addConfirmListener(ackCallback,nackCallback);// 批量发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));}// 结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+ (end - begin) + "ms");}
}

如何处理异步未确认信息?

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递

public class ComfirmMessage {// 批量发消息的个数public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、异步批量确认// 发布1000个异步确认消息,耗时36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 开启发布确认channel.confirmSelect();/** 线程安全有序的一个哈希表 适用于高并发的情况下* 1、轻松地将序号与消息进行关联* 2、轻松地批量删除,只要给到序号* 3、支持高并发* */ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();// 消息确认成功回调函数ConfirmCallback ackCallback = (deliveryTag,multiply) -> {// 删除到已经确认的消息,剩下的就是未确认的消息if(multiply){ConcurrentNavigableMap<Long, String> confiremed = outstandingConfirms.headMap(deliveryTag);confiremed.clear();}else {outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};// 消息确认失败回调函数/** 参数1:消息的标记* 参数2:是否为批量确认* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {// 打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:" + message +"未确认的消息tag:" + deliveryTag);};// 准备消息的监听器,监听哪些消息成功,哪些消息失败/** 参数1:监听哪些消息成功* 参数2:监听哪些消息失败* */channel.addConfirmListener(ackCallback,nackCallback);// 开始时间long begin = System.currentTimeMillis();// 批量发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));// 此处记录下所有要发送的消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}// 结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+ (end - begin) + "ms");}
}

RabbitMQ异步发布确认相关推荐

  1. springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...

  2. RabbitMQ单个发布确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候 ...

  3. RabbitMQ(三)发布确认 Publisher Confirms

    代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...

  4. RabbitMQ入门(三)消息应答与发布确认

    前言: 消息应答与发布确认都是保证消息不丢失.而重复消费问题则是消息幂等性.(之后会说幂等性) 消息应答: 应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理 ...

  5. RabbitMQ(三)发布确认

    4.1 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队 ...

  6. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  7. RabbitMQ 消费者回执和发布确认

    为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的. 由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功.因此,publisher 和 consum ...

  8. 十、RabbitMQ发布确认高级

    RabbitMQ发布确认高级 发布确认SpringBoot版本 发布确认Springboot版本 简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制. 介 ...

  9. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

最新文章

  1. 2 Ways Thinking In Ajax
  2. mysql解决Value ‘0000-00-00 00:00:00’ can not be represented as java.sql.Timestamp
  3. k8s使用glusterfs存储报错type 'features/utime'
  4. PHP中对象的深拷贝与浅拷贝
  5. fullgc一小时发生一次的原因
  6. mysql 事物的持久性是指_详解MySQL中事务的持久性实现原理
  7. php中的address,html中address是什么意思?(代码示例)
  8. php语法介绍,PHP 函数语法介绍一
  9. 备份图解 ---mysql 博客
  10. 今天的时间逻辑以及fix 一个 mysql 程序员错误的习惯
  11. 2015手机网民超10亿是什么景象?
  12. 安卓开发学习笔记—————《Anroid编程权威指南》第六章 Android编程与兼容性问题...
  13. ADAS/AD域控制器及芯片平台分析
  14. java怎么判断素数_java判断是否为素数(质数)的方法
  15. windows命令查看无线网密码
  16. 富士施乐248b粉盒清零_能不能告诉我施乐5070硒鼓芯片清零方法是什么
  17. 分门别类刷leetcode——二叉查找树(C++实现)
  18. 如何批量新建文件夹,批量新建文件夹并命名
  19. Java_Java多线程_Java线程池核心参数 与 手动创建线程池
  20. 配置TensorFlow的cuda环境教程

热门文章

  1. [CF]Codeforces Round #529 (Div. 3)
  2. grunt -- javascript自动化工具
  3. 【转】crontab 详解
  4. 超简单的java爬虫
  5. javascript(js)自动刷新页面的实现方法总结
  6. lhgselect 联动选择下拉菜单 v1.0.0 (2011-06-13)
  7. MySQL简单查询性能分析
  8. 关于大龄程序员的谣言 新手必读
  9. Win10安装华三模拟器各种疑难问题
  10. [Wrong Answer] Leetcode 805 JavaScript 解决方案 数组的均值分割