背景

分布式系统中,我们时常会遇到分布式事务的问题,如更新订单然后发送短信提醒,但是这两个操作需要操作不同的数据库,那么此时数据库的事务就不能处理好了
传统方式存在的问题:
1、先发送消息,再执行数据库事务,可能出现消息发送成功,但是本地事务执行失败,导致数据不一致

begin transaction
sendMsg()
updateStatus() ->可能失败,但是消息已经发送成功
commit transaction

2、先执行数据库事务,再发送消息,可能出现发送消息时超时返回失败,导致回滚了本地事务,但是消息其实已经成功发送,导致数据不一致

begin transaction
updateStatus()
sendMsg() ->可能出现超时等导致回滚事务,但是消息已经成功发送
commit transaction

RocketMq分布式事务方案

rocketmq事务性消息可以保证本地数据库等事务和发送给mq的消息(要么同时成功,要么同时失败)。但是不确保消费者可以成功消费
基本流程:
1、发送一个预消息给mq,此mq消息对于消费者暂时不可见
2、发送预消息成功,处理本地和数据库相关的事务,处理结果记录到数据库
3、把预消息更改为了正常消息,消费者可以消费。此时正常流程已经结束
4、消息回查,对于预消息进行回调,查询对应的消息在数据库记录的状态

  • 1、如果第2步失败了,本地业务代码或者数据库操作等报错,则数据库记录的失败,
    那么返回LocalTransactionState.ROLLBACK_MESSAGE丢弃预消息即可

  • 2、如果第2步成功了,那么第3步就必须成功,数据库记录该消息必定是成功(此时如果mq未收到对应的success,则进入回调),
    返回LocalTransactionState.COMMIT_MESSAGE把消息正常投递让消费者消费即可

直接上代码
生产者

public class TransactionProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {TransactionMQProducer transactionMQProducer = new TransactionMQProducer("tx_producer");transactionMQProducer.setNamesrvAddr("192.168.1.1:9876");ExecutorService executorService = Executors.newFixedThreadPool(10);transactionMQProducer.setExecutorService(executorService);transactionMQProducer.setTransactionListener(new TransactionListenerLocal()); //本地事务的监听transactionMQProducer.start();for (int i = 0; i < 10; i++) {String orderId = UUID.randomUUID().toString();String body = "{'operation':'doOrder','orderId':'" + orderId + "'}";Message message = new Message("order_tx_topic","TagA", orderId, body.getBytes(RemotingHelper.DEFAULT_CHARSET));// TODO 1、发送事务消息,此时消费对于消费者不可见,会先执行事务监听//  TransactionListenerLocal的executeLocalTransaction,并且传递参数// 实际业务时调用sendMessageInTransaction后即可调用executeLocalTransaction,// 如果成功则返回客户成功。mq最终必定收到消息,失败则返回客户失败,mq会回查然后丢弃该消息transactionMQProducer.sendMessageInTransaction(message, orderId + "&" + i);System.out.println("投递消息成功");}}
}
public class TransactionListenerLocal implements TransactionListener {private Map<String, Boolean> results = new ConcurrentHashMap<>();//TODO 2、执行本地事务,处理数据库相关,成功后results模拟处理结果,// 实际场景可能是存储到数据库记录某条消息的本地事务是否处理成功@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {String orderId = o.toString();//模拟数据库保存(成功/失败)boolean result = Math.abs(Objects.hash(orderId)) % 2 == 0;System.out.println("开始执行本地事务:" + o.toString() + ",result:" + result);results.put(orderId.substring(0, orderId.indexOf("&")), result);// 失败返回UNKNOW,则预消息会进入回调方法checkLocalTransaction回查return result ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;}//TODO 3、对于对消费者不可见的消息,这里进行回查,数据库记录成功的消息就提交mq事务,数据库记录失败的消息就直接丢弃@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {String orderId = messageExt.getKeys();System.out.println("执行事务回调检查: orderId:" + orderId);Boolean rs = results.get(orderId);System.out.println("数据的处理结果:" + rs); //只有成功/失败return (rs != null && rs) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}}

消费者:

public class TransactionConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer defaultMQPushConsumer=new DefaultMQPushConsumer("tx_consumer");defaultMQPushConsumer.setNamesrvAddr("192.168.1.1:9876");defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);defaultMQPushConsumer.subscribe("order_tx_topic","*");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {list.stream().forEach(message->{//扣减库存System.out.println("开始业务处理逻辑:消息体:"+new String(message.getBody())+"->key:"+message.getKeys());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //签收});defaultMQPushConsumer.start();}}

总结:rocketmq的事务性消息可以确保数据的最终一致性,核心机制是提供了回查功能,即可以根据数据库记录的本地事务的成功或者失败,最终回调来处理是否应该成功发送mq消息

rocketmq分布式事务最终一致性解决方案相关推荐

  1. 一致 先验分布 后验分布_「分布式技术」分布式事务最终一致性解决方案,下篇...

    各位志同道合的朋友们大家好,我是一个一直在一线互联网踩坑十余年的编码爱好者,现在将我们的各种经验以及架构实战分享出来,如果大家喜欢,就关注我,一起将技术学深学透,我会每一篇分享结束都会预告下一专题 上 ...

  2. 6种分布式事务最终一致性解决方案,一次性说清了!

    分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,尤其在微服务架构中,几乎可以说是无法避免,因此也常常被认为是微服务落地的最大阻碍. 随着系统的服务拓扑从单体应用迈 ...

  3. 基于RabbitMQ的分布式事务最终一致性解决方案

    1. 分布式事务 所谓事务,通俗一点讲就是一系列操作要么同时成功,要么同时失败.而分布式事务就是这一系列的操作在不同的节点上,那要如何保证事务的ACID特性呢. 原子性(atomicity).一个事务 ...

  4. 阿里 P8 聊分布式事务最终一致性的 6 种解决方案

    分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,尤其在微服务架构中,几乎可以说是无法避免,因此也常常被认为是微服务落地的最大阻碍. 随着系统的服务拓扑从单体应用迈 ...

  5. java分布式事务——最终一致性,最大努力通知总结!

    目录 源码地址:https://github.com/kaixuanzhang123/dtx.git 4.分布式事务解决方案之可靠消息最终一致性 4.1.什么是可靠消息最终一致性事务 4.2.解决方案 ...

  6. 分布式事务最终一致性常用方案

    目前的应用系统,不管是企业级应用还是互联网应用,最终数据的一致性是每个应用系统都要面临的问题,随着分布式的逐渐普及,数据一致性更加艰难,但是也很难有银弹的解决方案,也并不是引入特定的中间件或者特定的开 ...

  7. 分布式事务最终一致性-CAP框架轻松搞定

    前言 对于分布式事务,常用的解决方案根据一致性的程度可以进行如下划分: 强一致性(2PC.3PC):数据库层面的实现,通过锁定资源,牺牲可用性,保证数据的强一致性,效率相对比较低. 弱一致性(TCC) ...

  8. 分布式事务最终一致性mysql_分布式事务最终一致性方案案例

    前言: 以下以网上课程购买流程举一个例子: 如何实现两个分布式服务(订单服务.学习服务)共同完成一件事即订单支付成功自动添加学生选课的需求, 这里的关键是如何保证两个分布式服务的事务的一致性. 订单支 ...

  9. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

最新文章

  1. java中printnb方法_java打印技术---javax.print
  2. javascript之值传递与引用传递
  3. 《深入理解C++11:C++ 11新特性解析与应用》——3.2 委派构造函数
  4. 简单的session共享的封装
  5. POJ 放苹果问题(递归)
  6. TLS是如何保障数据传输安全(中间人攻击)
  7. python中文件基本操作命令及注意事项
  8. 网站换服务器需要注意什么问题,网站更换服务器要注意哪些事项
  9. Start with...Connect By子句递归查询一般用于一个表维护树形结构的应用。
  10. conv2d 公式_Pytorch 从0开始学(6)——Conv2d 详解
  11. 优秀案例快速提升UI设计界面的视觉效果、用户体验
  12. 二分法:木棒切割问题
  13. ibm mq qname java_IBMMQ 从队列获取消息并将消息发送到特定主题上面
  14. Delphi之TStrings和TStringLists类【转】
  15. 去中介化的租房EOS DAPP,实现租客与房东互赢
  16. 哪个手机浏览器可以倍速_手机四款浏览器APP的比较
  17. 光学系统建模之Light Tools安装关键点、教程指引
  18. 神经网络量化--per-channel量化
  19. 如何通俗易懂地阐述机器学习?
  20. uniapp一键登录功能

热门文章

  1. gspro能支持鸿蒙系统吗,荣耀手表GS Pro已开始内测升级鸿蒙OS 可安装第三方应用...
  2. python复制文件并保留修改时间等属性
  3. tableau学习案例
  4. mysql高可用MHA
  5. mysql 搭建mha_MHA快速搭建
  6. ObjectARX_多重引线MLeader
  7. 实验6、8254定时/计数器实验
  8. 两独立样本率的优效性试验-样本量计算
  9. clumsy 模拟网络丢包延迟
  10. 从360安全卫士中提取网络速度监控