由于工作流引擎项目中,工作流引擎服务和业务服务是分开的,所以就涉及到了分布式事务的问题。综合考虑到并发量和分布式事务的保障,最终选择了事务消息的方式。

首先我们来介绍下本地消息表这种方案,当消息队列不支持事务消息的时候,我们可以考虑这种方案。

本地消息表

基本流程

1、A 系统在自己本地一个事务里操作同时,插入一条数据到消息表;

2、接着 A 系统将这个消息发送到 MQ 中去;

3、B 系统接收到消息之后,在一个事务里,往自己本地消息表里插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息;

4、B 系统执行成功之后,就会更新自己本地消息表的状态以及 A 系统消息表的状态;

5、如果 B 系统处理失败了,那么就不会更新消息表状态,那么此时 A 系统会定时扫描自己的消息表,如果有未处理的消息,会再次发送到 MQ 中去,让 B 再次处理;

6、这个方案保证了最终一致性,哪怕 B 事务失败了,但是 A 会不断重发消息,直到 B 那边成功为止。

备注:A的消息表用于保证B正确消费了A发送的消息,B的消息表用于保证不重复消费同一条消息。

这个方案说实话最大的问题就在于严重依赖于数据库的消息表来管理事务,高平发场景下不好扩展,所以应用好像也不太多。

本地消息表是 BASE 理论,是最终一致模型,适用于对一致性要求不高的。实现这个模型时需要注意重试的幂等。

聊聊可靠消息最终一致性方案

这个方案的思路其实跟上面讲的本地消息表基本相同,但是不基于数据库,而是基于MQ来实现事务,RocketMQ提供了事务消息来支持这种方式。

基本原理

消息发送:

(A)发送方将半事务消息发送至消息队列 RocketMQ 版服务端。

(B)消息队列 RocketMQ 服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。

(C)发送方开始执行本地事务逻辑。

(D)发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

消息回查:

(A)在断网或者是应用重启的特殊情况下,上述步骤 D 提交的二次确认最终未到达服务端,经过固定时间后RocketMQ服务端将对该消息发起消息回查。(mq会自动定时轮询所有 prepared 消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。)

(B)发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

(C)发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 D 对半事务消息进行操作。

具体用法

在分布式消息队列中,目前唯一提供完整的事务消息的,只有 RocketMQ 。

可能会有网友说,RabbitMQ 和 Kafka 也有事务消息啊,也支持发送事务消息的发送,以及后续的事务消息的 commit提交或 rollbackc 回滚。但是要考虑一个极端的情况,在本地数据库事务已经提交的时时候,如果因为网络原因,又或者崩溃等等意外,导致事务消息没有被 commit ,最终导致这条事务消息丢失,分布式事务出现问题。

相比来说,RocketMQ 提供事务回查机制,如果应用超过一定时长未 commit 或 rollback 这条事务消息,RocketMQ 会主动回查应用,询问这条事务消息是 commit 还是 rollback ,从而实现事务消息的状态最终能够被 commit 或是 rollback ,达到最终事务的一致性。

// RocketMQ事务消息监听

@RocketMQTransactionListener(txProducerGroup = TX_PRODUCER_GROUP)

public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

// ... local transaction process, return rollback, commit or unknown

logger.info("[executeLocalTransaction][执行本地事务,消息:{} arg:{}]", msg, arg);

return RocketMQLocalTransactionState.UNKNOWN;

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

// ... check transaction status and return rollback, commit or unknown

logger.info("[checkLocalTransaction][回查消息:{}]", msg);

return RocketMQLocalTransactionState.COMMIT;

}

}

一般来说,有两种方式实现本地事务回查时,返回事务消息的状态。

第一种,通过 msg 消息,获得某个业务上的标识或者编号,然后去数据库中查询业务记录,从而判断该事务消息的状态是提交还是回滚。

第二种,记录 msg 的事务编号,与事务状态到数据库中。

第一步,在 #executeLocalTransaction(...) 方法中,先存储一条 id 为 msg 的事务编号,状态为 RocketMQLocalTransactionState.UNKNOWN 的记录。

第二步,调用带有事务的业务 Service 的方法。在该 Service 方法中,在逻辑都执行成功的情况下,更新 id 为 msg 的事务编号,状态变更为 RocketMQLocalTransactionState.COMMIT 。这样,我们就可以伴随这个事务的提交,更新 id 为 msg 的事务编号的记录的状为 RocketMQLocalTransactionState.COMMIT ,美滋滋。。

第三步,要以 try-catch 的方式,调用业务 Service 的方法。如此,如果发生异常,回滚事务的时候,可以在 catch 中,更新 id 为 msg 的事务编号的记录的状态为 RocketMQLocalTransactionState.ROLLBACK 。极端情况下,可能更新失败,则打印 error 日志,告警知道,人工介入。

如此三步之后,我们在 #executeLocalTransaction(...) 方法中,就可以通过查找数据库,id 为 msg 的事务编号的记录的状态,然后返回。

参考资料

原文:https://www.cnblogs.com/wunsiang/p/12765128.html

rocketmq消息持久化到mysql_RocketMQ之事务消息相关推荐

  1. 浅谈 RocketMQ、Kafka、Pulsar 的事务消息

    作者:ruoyuliu刘若愚,腾讯 WXG 后台开发工程师 导语 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka 和 Pulsar 都是当今业界 ...

  2. rabbitmq消息持久化,避免异常情况下,消息会丢失

    2019独角兽企业重金招聘Python工程师标准>>> 1) 使用python包amqp from amqp.basic_message import Message from am ...

  3. 通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

    文章转载自公众号  心源意码 , 作者 寻筝 "得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席." 由阿里自研的Roc ...

  4. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  5. Apache RocketMQ 正式开源分布式事务消息

    摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...

  6. RocketMQ事务消息从生产到消费原理详解(包括回查过程)

    名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...

  7. 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息

    近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...

  8. 探秘RocketMQ源码——Series1:Producer视角看事务消息

    简介:探秘RocketMQ源码--Series1:Producer视角看事务消息 1. 前言 Apache RocketMQ作为广为人知的开源消息中间件,诞生于阿里巴巴,于2016年捐赠给了Apach ...

  9. RocketMQ 源码分析 事务消息

    为什么80%的码农都做不了架构师?>>>    1. 概述 必须必须必须 前置阅读内容: <事务消息(阿里云)> 2. 事务消息发送 2.1 Producer 发送事务消 ...

最新文章

  1. 深度学习在人脸检测中的应用 | CSDN 博文精选
  2. 五大洲30国在华留学生千年古城欢度中国年
  3. Spring WebClient vs. RestTemplate
  4. python编程怎么建立工程_教你如何用Python脚本快速创建项目
  5. Nuxt爬坑系列之vuex
  6. 记录 关于浏览器跨域和设置默认浏览器的问题
  7. 【转】Thunderbird中配置签名
  8. Linux中变量#,@,0,1,2,*,$$,$?的含义
  9. mybatis批量插入oracle大量数据记录性能问题解决
  10. 分享:假如浏览器和搜索引擎不再支持外部链接跳转,优化何去何从呢?
  11. 和我一起开发Android应用(二)——“悦词-i背单词”项目功能分析
  12. sqlserver数据库安装后服务配置
  13. 从零开始的网站搭建,服务器与域名管理
  14. MySQL JDBC URL参数(转)
  15. mysql 序列自增长 恢复到1_MySQL查询结果另外自带一列自增序列号
  16. opencv特效编辑之雕刻效果
  17. [Sturts2]继承ActionSupport类
  18. 关于写的Java书籍进展
  19. redis-trib.rb 搭建集群
  20. Devexpress TreeList控件支持拼音首字母查询

热门文章

  1. cisco命令防ping_Cisco路由器命令禁止访问特定网站的四个设置步骤
  2. iphone个人热点无法开启_无法在 iPhone 上正常使用“个人热点”怎么办?
  3. oracle匿名代码块执行insert,MyBatis+Oracle在执行insert时空值报错之从源码寻找解决办法...
  4. 你修改了样式,却要我手动清除游览器缓存,这是BUG!
  5. android fragment学习4-底部布局扩展TabLayout
  6. 自定义控件三部曲之动画篇(一)——alpha、scale、translate、rotate、set的xml属性及用法
  7. 基于JAVA+SpringMVC+Mybatis+MYSQL的医院预约挂号系统
  8. OSError: [Errno 22] Invalid argument: ‘
  9. dede列表分页php,织梦用dede:sql实现列表页分页教程
  10. 剑指offer:序列化二叉(前序遍历+层次)