背景

我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。

MQ的优势和缺点

MQ是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。

但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。

消息可靠性的应对

消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker的实时刷盘持久化,消费端的手动ACK 。

这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动ACK机制。

手动ACK的问题

手动ACK可以保证消息一定被消费,但是需要确保手动ACK的顺序和消息顺序一致,为什么?

消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的offset进行拉取的,如果commit offset的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。

因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的ACK操作是顺序的,怎么办,难道只能同步拉消费取然后ACK么。

解决方案

最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。

某次看JUC中的AQS的时候,启发了我。

我们平时用的类似CountDownLauch这些并发工具类,不也是处理的多线程协作的问题么。

我们的场景完全没有AQS复杂,借鉴它的思路,应该是没有问题的。

  1. 创建双端队列,队列节点中需要维护自身处理状态state,和对应msg的offset。

  2. 服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。

  3. 消息消费完之后,通知队列中对应的节点,更新状态为完成。

  4. 队列头被更新后出队列,提交offset,并判断新的队列头的状态,直到遇到state是未完成的head时阻塞。

    undefined

方案解析

该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终commit offset时有序。

在最差情况下(即head节点对应的msg最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。

异步通知的实现

public class MSGFuture {/*全局变量,存放msg对应的future对象*/private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();/*全局不变唯一标识*/private final long id;/*最长等待时间*/private final int timeout;/*并发锁*/private final Lock lock = new ReentrantLock();/*通知条件*/private final Condition done = lock.newCondition();/*开始时间*/private final long start = System.currentTimeMillis();/*业务结果*/private volatile Object response;
}
//构造函数
public MSGFuture(Request request, int timeout) {/*全局自增ID*/this.id = request.getrId();/*超时时间*/this.timeout = timeout > 0 ? timeout : 1000;/*放入全局变量*/FUTURES.put(id, this);
}
//业务处理结果更新
public static void received(long id, Object response) {MSGFuture future = FUTURES.remove(id);if (future != null) {future.doReceived(response);} else {logger.warn("response return timeout,id:"+id);}}
//结果更新,通知等待条件
private void doReceived(Object res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}
//异步等待获取结果
public Object get(int timeout) throws TimeoutException {if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}}return returnFromResponse();}

总结

看到这里,有同学会说,这个和AQS有啥关系呀~

其实,只是处理思路的一种借鉴,比如state状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。

总之一句话,别说背八股文没用,多多了解会有大帮助~

借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题相关推荐

  1. RocketMQ如何保证消息顺序消费?又为何不解决消息重复消费问题?

    消息的顺序消费对于业务系统来说非常重要,一笔订单产生了3条消息,分别是订单创建.订单付款.订单完成.消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的. 如何保证消息顺序消费? ...

  2. 阿里RocketMQ如何解决消息的顺序和重复两大硬伤

    分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一 ...

  3. 聊一聊顺序消息(RocketMQ顺序消息的实现机制)

    本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间 ...

  4. rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?

    上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...

  5. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  6. RocketMQ常见问题-消息重复消费和消息重复的问题

    RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,重要的事情说三遍. 基本上说我很讨厌有人问这个问题,问这个问题首先你对消息的生命周期缺乏理解 ...

  7. AQS与CLH相关论文学习系列(四)- AQS的设计思路

    本文是AQS与CLH相关论文学习系列第四篇. 系列其他文章链接如下 AQS与CLH相关论文学习系列(一)- 排队式自旋锁思想启蒙 AQS与CLH相关论文学习系列(二)- MCS 锁 AQS与CLH相关 ...

  8. 面试精讲之面试考点及大厂真题 - 分布式专栏 15 如何解决消息重复,保证消息顺序问题

    15如何解决消息重复,保证消息顺序问题 自信和希望是青年的特权. --大仲马 引言 我在<12.项目中为什么要使用消息队列>中列举了两个使用消息队列的例子. (1)收银系统,确认收款成功, ...

  9. 消息队列面试 - 如何解决消息队列的延时以及过期失效问题?

    消息队列面试 - 如何解决消息队列的延时以及过期失效问题? 面试题 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决? 面试官心理分析 你看 ...

最新文章

  1. 全自动驾驶“生死时速”,特斯拉收购计算机视觉创企DeepScale
  2. 简明Python3教程 4.安装
  3. Android---Android 屏幕尺寸与密度
  4. 企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理
  5. PE知识复习之PE的两种状态
  6. [转]oracle中查询指定行数的记录
  7. 大数据新手的0基础学习路线,从菜鸟到高手的成长之路
  8. shell和php区别,PHP中exec函数和shell_exec函数的区别
  9. linux小米随身wifi,小米随身wifi for mac版详细使用图文步骤
  10. 抖音矩阵系统,抖音矩阵系统源码,抖音SEO源码。
  11. JNI调用dll库或so库
  12. 中嘉城湖靠谱国有企业要为实现人民对美好生活的向往不断奋斗
  13. 8 卷积神经网络——解决参数太多问题(1)
  14. 【DP专题】——洛谷P2466 [SDOI2008]Sue的小球
  15. PV,UV,VV 含义
  16. 东华助手 v1.6.5
  17. python3数据经base64解码后带b'的解决办法
  18. PHP代码实现反弹shell
  19. 【蓝桥杯Python组】既约分数
  20. mysql五中约束_大学计算机基础B答案

热门文章

  1. c语言小于n的素数和,关于求N以内素数的一点小问题(N小于一亿)
  2. 倒序查10条数据_王者荣耀对抗路数据公布,尖端局吕布倒第一,夏洛特真的很意外...
  3. 详解Python中的循环的几个类型
  4. ajax怎么设置好友,好友按ajax新消息通知
  5. 关于Visual Studio2019的4996错误警告解决方法
  6. 解题报告(一)快速沃尔什变换FWT(ACM / OI)超高质量题解
  7. Codeforces Round #699 (Div. 2) (A ~ F)6题全,超高质量良心题解【每日亿题】2021/2/6
  8. Oracle IMP-00403
  9. arm linux嵌入式网络控制系统,基于ARMLinux的嵌入式网络控制系统的研究与设计
  10. n2n linux,n2n安装