目录

为什么需要事务消息?

事务消息

事务消息使用注意点

彩蛋


在分布式系统中,为了保证数据一致性是必须使用分布式事务。分布式事务实现方式就很多种,今天主要介绍一下使用 RocketMQ 事务消息,实现分布事务。

文末有彩蛋,看完再走

为什么需要事务消息?

很多同学可能不知道事务消息是什么,没关系,举一个真实业务场景,先来带你了解一下普通的消息存在问题。

上面业务场景中,当用户支付成功,将会更新支付订单,然后发送 MQ 消息。手续费系统将会通过拉取消息,计算手续费然后保存到另外一个手续费数据库中。

由于计算手续费这个步骤可以离线计算,所以这里采用 MQ 解耦支付与计算手续费的流程。

流程主要涉及三个步骤:

  • 更新订单数据
  • 发送消息给 MQ
  • 手续费系统拉取消息

上面提到的步骤,任何一个都会失败,如果我们没有处理,就会使两边数据不一致,将会造成下面两种情况:

  • 订单数据更新了,手续费数据没有生成
  • 手续费数据生成,订单数据却没有更新

这可是涉及到真正的钱,一旦少计算,就会造成资损,真的赔不起!

对于最后一步来讲,比较简单。如果消费消息失败,只要没有提交消息确认,MQ 服务端将会自动重试。

最大的问题在于我们无法保证更新操作与发送消息一致性。无论我们采用先更新订单数据,再发送消息,还是先发送消息,再更新订单数据,都在存在一个成功,一个失败的可能。

如下所示,采用先发送消息,然后再更新数据库的方式。

上面流程消息发送成功之后,再进行本地事务的提交。这个流程看起来很完美,但是想象一下,如果在提交事务时数据库执行失败,导致事务回滚了。

然而此时消息已经发送出去,无法撤回。这就导致手续费系统紧接会消费消息,计算手续费并更新到数据库中。这就造成支付数据未更新,手续费系统却生成的不一致的情况。

那如果我们流程反一下,是不是就好了呢?

我们使用下面的伪码表示:

// 开始事务
try {// 1.执行数据库操作// 2.提交事务
}catch (Exception e){// 3.回滚事务
}
// 4.发送 mq 消息

这里如果事务提交成功,但是 mq 消息发送失败,就会导致支付数据更新但是手续费数据未生成的的不一致情况。

这里有的同学可能会想到,将发送 mq 消息步骤移动到事务中,消息发送失败,回滚事务,不就完美了吗?

伪码如下:

// 开始事务
try {// 1.执行数据库操作// 2.发送 mq 消息// 3.提交事务
}catch (Exception e){// 4.回滚事务
}

上面代码看起来确实没什么问题,消息发送失败,回滚事务。

但是实际上第二步有可能存在消息已经发送到 MQ 服务端,但是由于网络问题未及时收到 MQ 的响应消息,从而导致消息发送端认为消息消息发送失败。

这就会导致订单事务回滚了,但是手续费系统却能消费消息,两边数据库又不一致了。

熟悉 MQ 的同学,可能会想到,消息发送失败,可以重试啊。

是的,我们可以增加重试次数,重新发送消息。但是这里我们需要注意,由于消息发送耦合在事务中,过多的重试会拉长数据库事务执行时间,事务处理时间过长,导致事务中锁的持有时间变长,影响整体的数据库吞吐量。

实际业务中,不太建议将消息发送耦合在数据库事务中。

事务消息

事务消息是 RocketMQ 提供的事务功能,可以实现分布式事务,从而保证上面事务操作与消息发送要么都成功,要么都失败。

使用事务消息,整体流程如下:

首先我们将会发送一个半(half) 消息到 MQ 中,通知其开启一个事务。这里半消息并不是说消息内容不完整,实际上它包含所有完整的消息内容。

这个半消息与普通的消息唯一的区别在于,在事物提交之前,这个消息对消费者来说是不可见的,消费者不会消费这个消息。

一旦半消息发送成功,我们就可以执行数据库事务。然后根据事务的执行结果再决定提交或回滚事务消息。

如果事务提交成功,将会发送确认消息至 MQ,手续费系统就可以成功消费到这条消息。

如果事务被回滚,将会发送回滚通知至 MQ,然后 MQ 将会删除这条消息。对于手续费系统来说,都不会知道这条消息的存在。

这就解决了要么都成功,要么都失败的一致性要求。

实际上面的流程还是存在问题,如果我们提交/回滚事务消息失败怎么办?

对于这个问题,RocketMQ 给出一种事务反查的机制。我们需要需要注册一个回调接口,用于反查本地事务状态。

RocketMQ 若未收到提交或回滚的请求,将会定期去反查回调接口,然后可以根据反查结果决定回滚还是提交事务。

RocketMQ 事务消息流程整体如下:

事务消息示例代码如下:

public class TransactionMQProducerExample {public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");// 不定义将会使用默认的ExecutorService executorService =new ThreadPoolExecutor(2, 5, 100,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);TransactionListener transactionListener = new TransactionListenerImpl();producer.setTransactionListener(transactionListener);// 改成自己的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Order order = new Order("66666", "books");Message msg =new Message("transaction_tp",JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送半消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(sendResult.getSendStatus());producer.shutdown();}public static class TransactionListenerImpl implements TransactionListener {/*** 半消息发送成功将会自动执行该逻辑** @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务Order order = null;try {order = JSON.parseObject(new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET), Order.class);boolean isSuccess = updateOrder(order);if (isSuccess) {// 本地事务执行成功,提交半消息System.out.println("本地事务执行成功,提交事务事务消息");return LocalTransactionState.COMMIT_MESSAGE;} else {// 本地事务执行成功,回滚半消息System.out.println("本地事务执行失败,回滚事务消息");return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {System.out.println("本地事务执行异常");}// 异常情况返回未知状态return LocalTransactionState.UNKNOW;}/*** 更新订单* 这里模拟数据库更新,返回事务执行成功** @param order* @return*/private boolean updateOrder(Order order) throws InterruptedException {TimeUnit.SECONDS.sleep(1);return true;}/**** 若提交/回滚事务消息失败,rocketmq 自动反查事务状态* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {try {Order order = JSON.parseObject(new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET), Order.class);boolean isSuccess = queryOrder(order.getOrderId());if (isSuccess) {// 本地事务执行成功,提交半消息return LocalTransactionState.COMMIT_MESSAGE;} else {// 本地事务执行成功,回滚半消息return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {System.out.println("查询失败");}// 异常情况返回未知状态return LocalTransactionState.UNKNOW;}/*** 查询订单状态* 模拟返回查询成功** @param orderId* @return*/private boolean queryOrder(String orderId) throws InterruptedException {TimeUnit.SECONDS.sleep(1);return true;}}@Datapublic static class Order {private String orderId;private String goods;public Order(String orderId, String goods) {this.orderId = orderId;this.goods = goods;}}
}

上面代码中:

  1. 我们需要为生产者指定一个唯一的 ProducerGroup
  2. 需要继承 TransactionListener 注解回调接口,其中 executeLocalTransaction 方法执行本地事务,checkLocalTranscation 用来执行检查本地事务。
  3. 返回事务状态有三种:
    • LocalTransactionState.UNKNOW 中间状态,RocketMQ 将会反查
    • LocalTransactionState.COMMIT_MESSAGE 提交事务,消息这后续将会消费这条消息
    • LocalTransactionState.ROLLBACK_MESSAGE,回滚事务,RocketMQ 将会删除这条消息

事务消息使用注意点

事务消息最大反查次数

由于单个消息反查次数过多,将会导致半消息队列堆积,影响性能。 RocketMQ 默认将单个消息的检查次数限制为 15 次。

我们可以通过修改 broker 配置文件,增加如下配置:

# N 为最大检查次数
transactionCheckMax=N

当检查次数超过最大次数后,RocketMQ 将会丢弃消息并且打印错误日志。

若想自定义丢弃消息行为,需要修改 RocketMQ broker 端代码,继承 AbstractTransactionalMessageCheckListener 重写 resolveDiscardMsg 方法,加入自定义逻辑。

同步的双重写入机制

为了确保事务消息不丢失,并且保证事务完整性,需要将事务消息复制到集群其他节点,建议使用同步双重写入机制。

事务反查时间设置

我们可以设置以下参数,设置 MQ 服务端多久之后开始反查事务消息(自事务消息保存成功之后开始计算)。

msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");

或者我们可以在 broker.conf 设置以下参数:

# 单位为 ms,默认为 6 s
transactionTimeout=60000

发送端主动设置配置参数优先级大于 broker 端配置。

另外 RocketMQ 还有一个配置用于控制事务性消息检查间隔:

## 默认为 60s
transactionCheckInterval=5000

如果自定义配置如上,事务消息检查间隔为 5 秒,事务消息设置检查时间为 60 s。

这就代表 broker 每隔 5s 检查一次事务消息,如果此时事务消息到 MQ 服务端时间还未超过 60s,此时将不会反查,直到时间大于 60s。

彩蛋

查找事务消息资料的时候,发现 RocketMQ 文档存在相关错误。

文档地址:https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

如上两处实际是错误的,应该修改为:AbstractTransactionalMessageCheckListener 与 transactionTimeout

issue 地址:https://github.com/apache/rocketmq/issues/481

 

有兴趣可以关注我的微信公众号“自动化测试全栈”,微信号:QAlife,学习更多自动化测试技术。

也可加入我们的自动化测试技术交流群,QQ群号码:301079813主要探讨loadrunner/JMeter测试、Selenium/RobotFramework/Appium自动化测试、接口自动化测试,测试工具等测试技术,让我们来这里分享经验、交流技术、结交朋友、拓展视野、一起奋斗!

还不知道事务消息吗?这篇文章带你全面扫盲相关推荐

  1. 每个成功的男人背后都有个厉害的女人,这篇文章带你看看互联网大佬们背后的女人。...

    每个成功的男人背后都有个厉害的女人,这篇文章带你看看互联网大佬们背后的女人. 别看这些互联网大佬现在一幅高大上的样子,可是别忘了人家也有屌丝的时候,也有青春少年的时候啊!今天咱们就八卦一下,翻翻这些大 ...

  2. 设计模式一网打尽,40余篇文章带你领略设计模式之美

    文章末尾附带GitHub开源下载地址. 该文章的最新版本已迁移至个人博客[比特飞],单击链接 设计模式一网打尽,40余篇文章带你领略设计模式之美 | .Net中文网 访问. 设计模式概述 20世纪80 ...

  3. 四篇文章带你快速入门Jetpck(中)之ViewModel,DataBinding

    文章目录 四篇文章带你快速入门Jetpck(中)之ViewModel,DataBinding Jetpack 官方推荐架构 ViewModel 添加依赖 创建ViewModel 初始化ViewMode ...

  4. 什么产品适合抖音广告?本篇文章带你来了解

    到底什么产品适合抖音广告?抖音广告类型的多样化使不计其数的广告主选择在抖音这个热门app上投放产品广告.不断有很多广告主提出疑问,自己的产品到底适不适合在抖音做营销.下面为您介绍适合在抖音做广告的产品 ...

  5. synchronized()_这篇文章带你彻底理解synchronized关键字

    Synchronized关键字一直是工作和面试中的重点.这篇文章准备彻彻底底的从基础使用到原理缺陷等各个方面来一个分析,这篇文章由于篇幅比较长,但是如果你有时间和耐心,相信会有一个比较大的收获,所以, ...

  6. 「完结」12篇文章带你逛遍主流分割网络

    https://www.toutiao.com/a6717221469463511566/ 专栏<图像分割模型>正式完结了.在本专栏中,我们从编解码结构入手,讲到解码器设计:从感受野,讲到 ...

  7. 这篇文章带你彻底理解红黑树

    写在前面 当在10亿数据进行不到30次比较就能查找到目标时,不禁感叹编程之魅力!人类之伟大呀! -- 学红黑树有感. 终于,在学习了几天的红黑树相关的知识后,我想把我所学所想和所感分享给大家.红黑树是 ...

  8. 【完结】 12篇文章带你完全进入NLP领域,掌握核心技术

    专栏<NLP>第一阶段正式完结了.在本专栏中,我们从NLP中常用的机器学习算法开始,介绍了NLP中常用的算法和模型:从朴素贝叶斯讲到XLnet,特征抽取器从RNN讲到transformer ...

  9. 【完结】12篇文章带你逛遍主流分割网络

    专栏<图像分割模型>正式完结了.在本专栏中,我们从编解码结构入手,讲到解码器设计:从感受野,讲到多尺度融合:从CNN,讲到RNN与CRF:从2D分割,讲到3D分割:从语义分割到实例分割和全 ...

最新文章

  1. CVPR2021:推广到开放世界的在线自适应深度视觉里程计
  2. java 递归return_java递归和反向递归
  3. 2008-07-29
  4. 【数据结构与算法】之链表的操作和使用
  5. 神经网络(第五章补充)
  6. Spring中DispacherServlet、WebApplicationContext、ServletContext的关系
  7. VMware 8.0不能手动安装Linux5.5(命令行)问题解决方法
  8. python内置函数可以返回列表元组_十九、python内置函数汇总
  9. JavaScript面向对象那些事
  10. 乐max2 android 7.0,辣评烩:​乐Max 2即将升级EUI 6.0 基于安卓7.0!
  11. 常用零部件表面粗糙度标注及表面处理技术
  12. 主成分分析matlab代码_主成分分析,换个角度看世界
  13. 广东迅视资管 别让“顺风车”再度行驶至安全边缘
  14. 《MySQL实战45讲》——学习笔记31 “误删数据的解决方案(删行/删表/删库/删实例)“
  15. 【流媒体性能测试常用指标】
  16. 【Python爬虫】| XPANX获取抖音APP小视屏
  17. TCP长连接与短连接、心跳机制
  18. xxljob从入门到精通-全网段最全解说
  19. oracle和mybatis自增,在Springboot项目中使用MybatisPlus和Oracle实现主键ID的自增
  20. 首都师范 博弈论 3 3 1求解二人零和博弈

热门文章

  1. python foreach用法_使用foreach rdd和foreach在pysp中迭代rdd
  2. 速度前瞻运动控制c语言程序_整合实时运动控制及多颗相机连接,大幅提升光学影像检测速度...
  3. 著名开源项目,神秘Bug影响超20000个项目,原因竟是作者删库跑路?
  4. 浪潮的加班标语炸了,这是顶风作案?网传:1月加班87小时还要扣工资?
  5. 成为优秀程序员需要具备的15种编程技巧
  6. 在人脸识别的时候,一定要穿上衣服啊!
  7. 初五迎财神!即使没有年终奖,也要来一发红包!
  8. 微软宣布加入 OpenJDK,打不过就改变 Java 未来!
  9. 说说我为什么看好Spring Cloud Alibaba
  10. 一个超乎想象的垃圾收集器:ZGC