搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

初步认识RocketMQ的核心模块

rocketmq模块

rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。

rocketmq-client:提供发送、接受消息的客户端API。

rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。(有点NameNode的味道)

rocketmq-common:通用的一些类,方法,数据结构等

rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议

rocketmq-store:消息、索引存储等

rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】

rocketmq-tools:命令行工具

分布式消息队列RocketMQ--事务消息--解决分布式事务

说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢? 还是先发送消息,后update DB?

假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论: 只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢?

错误的方案

有人可能想到了,我可以把“发送消息”这个网络调用和update DB放在同1个事务里面,如果发送消息失败,update DB自动回滚。这样不就保证2个操作的原子性了吗?

这个方案看似正确,其实是错误的,原因有2:

(1)网络的2将军问题:发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢?还是消息已经收到了,只是返回response的时候失败了?

     如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致A账号的钱没有扣,B账号的钱却加了。

(2)把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务。严重的,会block整个DB。这个风险很大。

     基于以上分析,我们知道,这个方案其实是错误的!

方案1–业务方自己实现

假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?

解决方案如下: (1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。

(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。

(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?

消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。

通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。

方案2 – RocketMQ 事务消息

为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤: (1) 发送Prepared消息 (2) update DB (3) 根据update DB结果成功或失败,Confirm或者Cancel Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

具体代码实现如下:

也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。

// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();// 构造事务消息的生产者TransactionMQProducer producer = new TransactionMQProducer("groupName");// 设置事务决断处理类producer.setTransactionCheckListener(transactionCheckListener);// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();producer.start()// 构造MSG,省略构造参数Message msg = new Message(......);// 发送消息SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);producer.shutdown();​---------------------------------------------------------------------------------------------

 public TransactionSendResult sendMessageInTransaction(.....) {    // 逻辑代码,非实际代码    // 1.发送消息    sendResult = this.send(msg);    // sendResult.getSendStatus() == SEND_OK    // 2.如果消息发送成功,处理与消息关联的本地事务单元    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);    // 3.结束事务    this.endTransaction(sendResult, localTransactionState, localException);}​​

上面所说的消息中间件上注册的listener,超时以后会回调producer的接口以确定事务执行情况

总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

人工介入

可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?

消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?

答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?

对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

转载于:https://www.cnblogs.com/itxiaok/p/10356673.html

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务相关推荐

  1. 基于数据库的事务消息解决分布式事务方案

    转载请注明出处:http://www.cnblogs.com/lizo/p/8516502.html 概述 当单库已不能支撑当前业务的时候,我们往往都考虑进行分库(横向拆分或者纵向拆分).但分库有个无 ...

  2. 启发:从MNS事务消息谈分布式事务

    启发:从MNS事务消息谈分布式事务 事务消息本质上解决的问题是业务系统与消息系统之间的事务问题(跨系统分布式事务),其基本原理即两阶段提交以及最终一致性保障.最近看了下阿里云mns事务消息的实现原理, ...

  3. RocketMQ的Producer详解之分布式事务消息(回顾事务)

    回顾什么事务 聊什么是事务,最经典的例子就是转账操作,用户A转账给用户B1000元的过程如下: 用户A发起转账请求,用户A账户减去1000元 用户B的账户增加1000元 如果,用户A账户减去1000元 ...

  4. 分布式事务 - 如何解决分布式事务问题?

    分布式事物 - 如何解决分布式事务问题? 面试题 分布式事务了解吗?你们是如何解决分布式事务问题的? 面试官心理分析 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑, ...

  5. LCN分布式事务框架解决分布式事务一致性问题

    LCN分布式事务框架 框架介绍 LCN分布式事务框架其本身并不创建事务,而是基于对本地事务的协调从而达到事务一致性的效果. 核心步骤 创建事务组 是指在事务发起方开始执行业务代码之前先调用TxMana ...

  6. 800页PPT搞懂阿里技术及生态全貌,“未入阿里,知根知底”

    本文以及下一篇文章,将从技术研发和数据结构与算法,为你清晰呈现阿里业务生态的全貌:文化娱乐(优酷.土豆等).核心电商业务(天猫.淘宝.村淘等).本地生活(高德地图.盒马等).支付&金融服务(蚂 ...

  7. 5G元年,你最该搞懂的技术竟然是……

    导读:4G改变生活,5G改变社会.5G是下一代移动通信技术,5G的应用将渗透到社会生活和生产的各个领域,比如沉浸式媒体.自动驾驶汽车.智慧工厂/城市/建筑.互联健康.下一代教育等. 但作为一名通信领域 ...

  8. 一文彻底搞懂SLAM技术

    什么是SLAM? SLAM (simultaneous localization and mapping),也称为CML (Concurrent Mapping and Localization), ...

  9. 一文搞懂显示技术的底层框架

    1. DPU与GPU的耦合是历史产物,完全可以独立出来 2. DPU的原型设计 2.1[DPU的四大组成部分] 2.2[KSM与DPU] 3. DPU的最新设计 3.1[Source Suface P ...

最新文章

  1. 第一课:网络参考模型OSI
  2. 自动化测试测试工具 AirTest 的使用方法与简介
  3. linux目录为root所有文件,linux获取文件所有目录/文件夹的例子linux操作系统 -电脑资料...
  4. Tool之ADB:ADB工具的简介、安装、使用方法之详细攻略
  5. linux deepin手动升级内核命令
  6. PHP实现文章的删除,php如何实现删除文章
  7. 地产相继入局智能家居,LifeSmart云起获新世界集团战略投资
  8. centos 7 mysql 源码安装_centos7 mysql5.7.17源码安装
  9. 开发一个自己的 CSS 框架(二) 1
  10. socket编程—— 服务器遇到Broken Pipe崩溃
  11. 学生宿舍管理系统java课设_java编写的学生宿舍管理系统
  12. 当前安装包签名出现异常_安卓系统手机安装应用出现应用签名异常或-22错误(联网验证失败)的应对方法...
  13. qq发送文件时显示服务器拒绝,qq给对方发文件为什么服务器拒绝接收 - 卡饭网...
  14. java打包文件生成zip压缩包
  15. 微信小程序 多标签选择和添加标签
  16. 阿里云服务器租用价格表,阿里云服务器优惠
  17. linux之ps aux、ps -aux、ps -ef命令的区别
  18. Oracle数据库安装配置
  19. gets()函数的缺陷,引入fgets()函数
  20. 2017年技术教练认证流程回顾

热门文章

  1. 魅族适配鸿蒙吗,魅族智能生活发布会新增看点:接入鸿蒙操作系统
  2. ai怎么画循环曲线_AI插画设计,用AI制作一个只可爱的短腿柯基插画
  3. pdf压缩工具_PDF文件过大如何缩小,几步教你完成压缩
  4. 在C语言中,存在的内存的连续性的声明
  5. 两相四线步进电机C语言程序,求大神帮忙看单片机控制两相四线步进电机的程序!...
  6. python windows窗口置顶_Python入门:第一个程序“Hello, world”
  7. 半导体基础知识(1):材料和器件
  8. 傅里叶变换的应用之调制解调初步、采样
  9. OpenStack潜力巨大:红帽打造生态系统
  10. 使用SaltStack安装JDK1.6