在RocketMQ中生产者有三种角色NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),根据名字大概可以看出各个代表着什么作用,我们这里用TransactionProducer(事务)来解决分布式事务问题。

说到分布式事务,就会谈到那个经典的”账户转账”问题:2个账户,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢?还是先发送消息,后update DB?
假设先update DB成功,发送消息网络失败,重发又失败,怎么办?
假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论:只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢??
为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤:
(1) 发送Prepared消息
(2) update DB
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

(1) 执行业务逻辑的部分

package com.lynch.simple.demo;import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;/*** 执行业务逻辑的部分* * @author jqlin**/
public class TransactionExecuterImpl implements LocalTransactionExecuter {@Overridepublic LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {System.out.println("执行本地事务msg = " + new String(msg.getBody()));  System.out.println("执行本地事务arg = " + arg);  //DB操作 应该带上事务 service -> dao//如果数据操作失败  需要回滚    同时返回RocketMQ一条失败消息  意味着消费者无法消费到这条失败的消息//如果成功 就要返回一条rocketMQ成功的消息,意味着消费者将读取到这条消息//o就是attachmentString tags = msg.getTags();  if (tags.equals("transaction2")) {  System.out.println("===> 本地事务执行失败,进行MQ ROLLBACK");return LocalTransactionState.ROLLBACK_MESSAGE;  }  System.out.println("===> 本地事务执行成功,发送确认消息");// return LocalTransactionState.UNKNOW;  return LocalTransactionState.COMMIT_MESSAGE;  }}

(2) 处理事务回查的代码部分

package com.lynch.simple.demo;import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;/*** 处理事务回查的代码部分* * @author jqlin**/
public class TransactionCheckListenerImpl implements TransactionCheckListener {@Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));//由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?//可以查询数据库看这条数据是否已经处理return LocalTransactionState.COMMIT_MESSAGE;}}

(3) 启动生产者

package com.lynch.simple.demo;import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();// 构造事务消息的生产者TransactionMQProducer producer = new TransactionMQProducer("transactionProducer");producer.setNamesrvAddr("127.0.0.1:9876");// 事务回查最小并发数producer.setCheckThreadPoolMinSize(2);// 事务回查最大并发数producer.setCheckThreadPoolMaxSize(2);// 队列数producer.setCheckRequestHoldMax(2000);// 设置事务决断处理类
        producer.setTransactionCheckListener(transactionCheckListener);producer.start();// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();for (int i = 1; i <= 2; i++) {try {String tags = "transaction" + i;String keys = "KEY" + i;byte[] body = ("Hello RocketMQ " + i).getBytes();Message msg = new Message("topicTransaction", tags, keys, body);SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);System.out.println(sendResult);} catch (MQClientException e) {e.printStackTrace();}}producer.shutdown();}
}

(4) 启动消费消息

package com.lynch.simple.demo;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;public class TransactionConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transactionConsumer");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("topicTransaction", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(msg + ",内容:" + new String(msg.getBody()));} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                    }}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }});consumer.start();System.out.println("transaction_Consumer Started.");}
}

重点来了:3.2.6之前的版本这样写就可以了,但是之后的版本被关于事务回查这个接口被阉割了,不会在进行事务回查操作。
那么向MQ发送消息如果失败的话,会造成A银行扣款成功而B银行收款未成功的数据不一致的情况

解决办法

转载于:https://www.cnblogs.com/linjiqin/p/9561641.html

RocketMQ 分布式事务相关推荐

  1. rocketmq分布式事务最终一致性解决方案

    背景 分布式系统中,我们时常会遇到分布式事务的问题,如更新订单然后发送短信提醒,但是这两个操作需要操作不同的数据库,那么此时数据库的事务就不能处理好了 传统方式存在的问题: 1.先发送消息,再执行数据 ...

  2. RocketMQ分布式事务原理介绍

    RocketMQ实现分布式事务原理 1.知识准备 在系统架构从单体到分布式.SOA.微服务的发展过程中,因为流量的增多出现了大量消息堆积问题的需求,在这种背景下,阿里开发出rocketmq来解决该问题 ...

  3. RocketMQ 分布式事务消息过程分析

    生产环境最小单元高可用部署模式: 如果192.168.3.100的broker假死,那么3.110,3.111的nameserver都能在2分钟内感知broker-a宕机,然后客户端能成功从names ...

  4. 分布式事务:RocketMQ实现分布式事务原理

    之前讲过有关分布式事务2PC.3PC.TCC的理论知识,博客地址: 1.分布式事务(1)---2PC和3PC原理 2.分布式事务(2)---TCC原理 这篇讲有关RocketMQ实现分布式事务的理论知 ...

  5. 分布式事务(6)-分布式事务处理技术之RocketMQ

    目录 什么时候会用到消息中间件保证事务一致型 概念 RocketMQ集群概述 大概流程 具体流程如下 代码实现过程 Producer: Consumer: 踩坑记 案例代码实现 什么时候会用到消息中间 ...

  6. SpringCloud Alibaba 2021微服务实战三十二 集成RocketMQ实现分布式事务

    目录 基于RocketMQ分布式事务 - 完整示例 2.解决方案 2.1.本地消息表方案 2.2.RocketMQ事务消息方案 一.事务消息 二.订单服务 1.事务日志表 2.TransactionM ...

  7. 还不理解“分布式事务”?这篇给你讲清楚!

    " 这篇文章将介绍什么是分布式事务,分布式事务解决什么问题,对分布式事务实现的难点,解决思路,不同场景下方案的选择,通过图解的方式进行梳理.总结和比较. 相信耐心看完这篇文章,谈到分布式事务 ...

  8. 分布式事务实现原理【BAT 面试题宝库附详尽答案解析】

    问题场景 什么是事务? 事务是数据库从一个稳定状态变迁到另一个稳定状态的保证,具备 ACID 这 4 个特性: 原子性(Atomicity):一个事务中的所有操作,要么全部完成,要么全部不完成,不会结 ...

  9. 【BAT 面试题宝库附详尽答案解析】分布式事务实现原理

    问题场景 什么是事务? 事务是数据库从一个稳定状态变迁到另一个稳定状态的保证,具备 ACID 这 4 个特性: 原子性(Atomicity):一个事务中的所有操作,要么全部完成,要么全部不完成,不会结 ...

最新文章

  1. 纯JS导出excel(支持中文)
  2. 开源的 6 条社会契约
  3. linux服务之git
  4. hdu 5285 二分图黑白染色
  5. 16线激光雷达配置教程
  6. [Lintcode]115. Unique Paths II/[Leetcode]63. Unique Paths II
  7. jqgrid 启用键盘操作bindKeys
  8. Java开发入门与实战!java图形用户界面布局
  9. 从 C10K 到 C500K
  10. 【图像配准】基于matlab互信息图像配准【含Matlab源码 1210期】
  11. java操作JSON对象
  12. Java实现PDF生成(Word文档转Pdf)
  13. 通过TXT文件批量生成PDF417码
  14. 怎么用python实现序列比对_生信学习笔记——Python+Mafft实现批量化多序列比对
  15. 【熊出没注意!】大家都是怎么治那些来家里玩的熊孩子的?
  16. 牛客 | C 选择颜色
  17. angular RxJs
  18. 区块链-高级密钥和地址
  19. 自动驾驶和辅助驾驶基础知识
  20. css 交集选择器 并集选择器 后代选择器

热门文章

  1. 神经网络——BP算法
  2. AxureRP9不同Page使用同一个Master,触发不同事件。
  3. 【例题+习题】【数值计算方法复习】【湘潭大学】(六)
  4. sqoop2安装详细过程[截图说明]
  5. 反射矩阵(reflection matrix)推导
  6. 任意两点间的最短路径
  7. h0152. 故事计算题(计蒜客——西邮K题)解析
  8. STM32 - CubeMX 的使用实例详细(01)- STM32F103的配置 - GPIO设定
  9. python求和_用python求和
  10. linux timespec 链接库,Linux内核 timespec_sub()