文章目录

  • 事务消息
    • RocketMQ事务流程概要
    • RocketMQ事务流程关键
    • 事务消息的使用约束
    • 分布式事务场景分析
      • 场景案例
      • RocketMQ事务消息设计分析
      • 消费事务消息

事务消息

RocketMQ事务流程概要

RocketMQ 的事务消息,是指Producer端消息发送事件和本地事务事件,同时成功或同时失败
RocketMQ实现事务主要分为两个阶段: 正常事务的发送及提交、事务信息的补偿流程(都是针对生产者 因为事务只出现在DataBase中 有些情况需要将消息存储在数据库中 如果发生事务问题…)

整体流程为:

  • 正常事务发送与提交阶段
  1. 生产者发送一个半消息给broker(半消息是指的暂时不能消费的消息)
  2. 服务端响应
  3. 开始执行本地事务
  4. 根据本地事务的执行情况执行Commit或者Rollback
  • 事务信息的补偿流程
  1. 如果broker长时间没有收到本地事务的执行状态,会向生产者发起一个确认会查的操作请求
  2. 生产者收到确认会查请求后,检查本地事务的执行状态
  3. 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollbacke操作时发生超时或失败的情况

RocketMQ事务流程关键

  • 事务消息在一阶段对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费.这里RocketMQ实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC.这样由于消费者没有订阅这个主题,所以不会消费.

  • 如何处理第二阶段的发送消息?

在本地事务执行完成后回向Broker发送Commit或者Rollback操作,此时如果在发送消息的时候生产者出故障了,要保证这条消息最终被消费,broker就会向服务端发送回查请求,确认本地事务的执行状态.当然RocketMQ并不会无休止的发送事务状态回查请求,默认是15次,如果15次回查还是无法得知事务的状态,RocketMQ默认回滚消息(broker就会将这条半消息删除)

  • 事务的三种状态:

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

使用
创建生产者时我们不在简单地创建DefaultMQProducer 而是RocketMQ事务专属的 TransactionMQProducer 并且不再简单地发送消息了 而是设置一个事务监听器 setTransactionListener(new TransactionListener(){…}); 实现接口方法 并且由于监听器需要等待本地事务的执行情况我们不能再生产者发送完消息后关闭

Producer

/*** 事务消息生产者*/
public class TransactionMessageProducer {/*** 事务消息监听实现*/private final static TransactionListener transactionListenerImpl = new TransactionListener() {/*** 在发送消息成功时执行本地事务* @param msg* @param arg producer.sendMessageInTransaction的第二个参数* @return 返回事务状态* LocalTransactionState.COMMIT_MESSAGE:提交事务,提交后broker才允许消费者使用* LocalTransactionState.RollbackTransaction:回滚事务,回滚后消息将被删除,并且不允许别消费* LocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// TODO 开启本地事务(实际就是我们的jdbc操作)// TODO 执行业务代码(插入订单数据库表)// int i = orderDatabaseService.insert(....)// TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作)// 模拟一个处理结果int index = 8;/*** 模拟返回事务状态*/switch (index) {case 3:System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys());return LocalTransactionState.ROLLBACK_MESSAGE;case 5:case 8:return LocalTransactionState.UNKNOW;default:System.out.println("事务提交,消息正常处理");return LocalTransactionState.COMMIT_MESSAGE;}}/*** Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),* 由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback* @param msg* @return 返回事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commitString transactionId = msg.getTransactionId();String key = msg.getKeys();System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key, msg.getMsgId(), transactionId);if ("id_5".equals(key)) { // 刚刚测试的10条消息, 把id_5这条消息提交,其他的全部回滚。System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys());return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.printf("未查到本地事务状态,回滚消息,id:%s%n", msg.getKeys());return LocalTransactionState.ROLLBACK_MESSAGE;}}};public static void main(String[] args) throws MQClientException, IOException {// 1. 创建事务生产者对象// 和普通消息生产者有所区别,这里使用的是TransactionMQProducerTransactionMQProducer producer = new TransactionMQProducer("GROUP_TEST");// 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步producer.setNamesrvAddr("192.168.100.242:9876");// 3. 设置事务监听器producer.setTransactionListener(transactionListenerImpl);// 4. 启动生产者producer.start();for (int i = 0; i < 10; i++) {String content = "Hello transaction message " + i;Message message = new Message("TopicTest", "TagA", "id_" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息(发送一条新订单生成的通知)SendResult result = producer.sendMessageInTransaction(message, i);System.out.printf("发送结果:%s%n", result);}System.in.read();// 6. 停止生产者producer.shutdown();}
}

Consumer 整个事务消息环节与Consumer相关不大,所以不用对原来的Consumer进行修改 正常接收消息即可.

事务消息的使用约束

  1. 事务消息不支持定时和批量
  2. 为了避免一个消息被多次检查,导致半数队列消息堆积,RocketMQ限制了单个消息的默认检查次数为15次,通过修改broker配置文件中的transactionCheckMax参数进行调整
  3. 特定的时间段之后才检查事务,通过broker配置文件参数transactionTimeout或用户配置CHECK_IMMUNITY_TIME_IN_SECONDS调整时间
  4. 一个事务消息可能被检查或消费多次
  5. 提交过的消息重新放到用户目标主题可能会失败
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享

分布式事务场景分析

分布式事务,是一个在每个微服务项目中都绕不开的问题。常见的解决分案有通过Redis、zk、mq、seata等方式处理。这篇博文全面的分析一下RocketMq中事务消息的机制。

场景案例

事务的经典场景有很多,如银行转账、订单库存等等。相对于分布式事务来说,订单系统和库存系统间的事务场景更为形象。如:用户操作下单,我们首先需要生成一条订单信息,然后库存系统需要针对订单中的商品进行库存扣减的操作。这两步操作必须保证数据的一致性,否则会出现库存超扣等情况。

RocketMQ事务消息设计分析

第一种情况如图所示,在本地事务提交前发送事务消息。若在创建订单信息时发生了异常,而此时事务消息已经成功发送,库存系统消费事务消息就会导致订单并没有创建成功,而库存却被扣减。

进而有了第二种情况,如图所示,在本地事务提交完成后再发送事务消息。若在发送事务消息的过程发生了异常,如网络波动等等,将会出现订单已创建完成,而库存系统永远也监听不到消息,导致库存无法正常扣减。

综合第一和第二种情况,汇总成第三种方案如图所示。在本地事务执行前,先向MQ发送前置的Prepared消息,在本地事务执行完毕后,再发送确认的消息,告知MQ当前事务消息需提交/回滚。如果事务正常提交成功,那么这条消息将会被消息消费方监听到;如果事务回滚,MQ会丢弃这条消息,消息消费方无法监听到这条消息。以上情况对应 事务消息生产者的设计思路 图中的 1、2、3、4步骤。

继续分析,如果上图的第二步中,发送确认消息的过程中,出现异常,没有发送成功怎么办?RocketMQ会定期(默认60s)扫描Prepared消息,如果迟迟没有收到确认消息,将会执行事务回查的逻辑,主动去消息生产方确认事务状态。对应 事务消息生产者的设计思路 图中的 5、6、7步骤。综上,是事务消息中生产者的设计思路,保证本地事务和事务消息一致性。

消费事务消息


如上图中,在事务消息者中,如果步骤4返回了消费失败或者超时未响应的情况,怎么办?RocketMQ对待事务消息的处理和普通消息一样。如果消费失败或超时,将会把这条消息加入到重试队列中,不断是重复执行步骤3、4,如果重复的次数达到阈值,那么可能需要人工介入处理。

如果消费方本地事务执行成功,仅仅是在确认消息时失败呢? 那么这里又会出现另一个问题 重复消费? 这里就需要具体的业务模块去处理消息的幂等性。如接住Redis来处理。如在本地事务执行前先去查询redis中当前消息是否已经消费,执行成功后再向redis写入一条成功消费的记录,这样就能保证消费不会被重复消费了。

Q&A
Q:从一致性方面考虑,直接采用RPC也可以完成,RPC也支持重试,为什么还要采用MQ?

A:首先应该分清MQ和RPC的应用场景,在现在微服务的架构下,所有人都强调低耦合高内聚,做业务上的解耦,直接采用RPC的方式就会出现强依赖,与微服务的理念背道而驰。

Q:为什么事务消息消费失败后,需要人工介入处理?

A:首先对于一个复杂的系统来讲,将实现整个业务逻辑回滚的代价是巨大的,不但系统复杂度将大大提升,而且还会引入新的问题,如在回滚的过程中又出现了其他事务异常,又该如何处理?其次在一个健壮的系统中出现事务回滚的情况本来就是概率极低的情况,在程序设计时,需要衡量一下为解决这个问题付出的人力物力成本值不值得。

Q:为什么不直接在消息服务层面解决重复消费的问题?

A:消费重复消费解决可以从两个方面考虑。第一 消费方处理消息的业务逻辑保持幂等性,只要保持幂等性,不管重复消费多少次,结果都是一样的;第二保证每条消息都有唯一编号且保证消费处理成功和去重表的日志同时出现,正常情况下出现重复消费的概率并不大,如果消费系统对所有的消费都做处理的话,对系统的吞吐量和高可用会产生影响,所以最好由各自业务系统决定如果处理重复消费。

Q:RocketMQ没能从根本上结果分布式事务问题

A:RocketMQ自身没办法做到像本地事务处理添加@Transactional注解就可以完成事务的提交和回滚。如果有需要,可以尝试使用seata中间件来处理分布式事务。

参考文章:
https://blog.csdn.net/D1842501760/article/details/123142298
https://blog.csdn.net/lishuzhen5678/article/details/122666090

RocketMQ事务消息相关推荐

  1. 实战分析 RocketMQ事务消息

    众所周知,在分布式领域有两大经典理论:CAP 和 BASE.一般情况下,我们将CAP中的数据一致性称为"强一致性",将BASE中的数据一致性称为"最终一致性". ...

  2. 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

    搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...

  3. 通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

    文章转载自公众号  心源意码 , 作者 寻筝 "得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席." 由阿里自研的Roc ...

  4. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  5. RocketMQ事务消息实现分析

    这周RocketMQ发布了4.3.0版本,New Feature中最受关注的一点就是支持了事务消息: 今天花了点时间看了下具体的实现内容,下面是简单的总结. RocketMQ事务消息概要 通过冯嘉发布 ...

  6. RocketMQ 事务消息

    RocketMQ 事务消息在实现上充分利用了 RocketMQ 本身机制,在实现零依赖的基础上,同样实现了高性能.可扩展.全异步等一系列特性. 在具体实现上,RocketMQ 通过使用 Half To ...

  7. SpringBoot2.x Nacos RocketMQ 事务消息

    需求背景: 现在有内容中心(content-center)和 用户中心(user-center)2个微服务,请求内容中心,发送消息给用户中心,完成为指定用户添加积分操作. 文章目录 一.准备工作 1. ...

  8. 一文详解,RocketMQ事务消息

    在RocketMQ中有一个非常有用的功能,就是事务消息功能,事务消息机制,可以让我们确保发送的消息一定能写进MQ里,绝不会丢失掉. MQ事务消息机制还是挺有用的,在业内还是比较常见的,所以今天我们就来 ...

  9. RocketMQ事务消息及消息索引设计原理

    RocketMQ事务消息 正常事务消息的发送及提交 事务消息的补偿流程 一阶段的half消息如何做到对用户不可见? 回滚之后 pending状态的消息如何变成最终状态 通过Op消息来确定提交或回滚事务 ...

  10. RocketMQ事务消息从生产到消费原理详解(包括回查过程)

    名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...

最新文章

  1. R 语言中 X11 相关的一些问题
  2. Oracle的自定义函数浅析
  3. 经典C语言程序100例之十八
  4. bzoj 2844: albus就是要第一个出场 高斯消元
  5. 学习方法之02掌握记忆方法,在学习上就赢了一半
  6. 轻轻松松看懂Spring AOP源码
  7. 1前端学习(2345):关于前端对于xml格式文件的渲染
  8. Python——LOL官方商城皮肤信息爬取(一次练手)
  9. mysql修改字段一部份_mysql 修改字段中部分值
  10. WORD如何创建三线表样式?
  11. 【Kafka】Kafka Producer整体架构概述及源码分析
  12. Maven私服(二) - Nexus的安装
  13. 结对编程四则运算第三周-挑战出题(20172301、20172304、20172328)
  14. Python人脸识别 Python3.7+OpenCV+Dlib+罗技C920摄像头 实现离线实时摄像头画面人脸检测+识别
  15. 解决es集群Yellow与Red的问题
  16. SpringBoot项目使用RestTemplate发送请求踩坑记录
  17. RPA:让电商财务拥有“分身术”
  18. H5网页如何在微信中自定义分享链接(可设置标题+简介+图片)
  19. 古文觀止卷九_愚溪詩序_柳宗元
  20. 计算机教室英语怎么读音,多媒体教室,multimedia classroom,音标,读音,翻译,英文例句,英语词典...

热门文章

  1. 准备好了吗?2021年的7大科技趋势
  2. 35岁到40岁,如何突破
  3. swift monkeyking 社交分享
  4. AJAX 后端处理程序
  5. 什么是modbus通信协议?
  6. dsp调音一次多少钱_dsp调音技巧
  7. 关于Kurento 和 WebRTC-Kurento学习(一)
  8. 一带一红网红基地推出O2O网红直播过年模式
  9. Kotlin Sealed Class
  10. 【算法】搜索算法—盲目搜索和启发式搜索