上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器,消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,当执行本地事务,如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢?原来RocketMQ使用TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。TransactionalMessageCheckService的检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值,单位为毫秒。

代码@1:从broker配置文件中获取transactionTimeOut参数值,表示事务的过期时间,一个消息的存储时间 + 该值 大于系统当前时间,才对该消息执行事务状态会查。
 代码@2:从broker配置文件中获取transactionCheckMax参数值,表示事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息

   上述代码虽多,其实实现思路非常清晰,先使用一个匿名类( Runnable )构建一个运行任务,然后提交到checkExecutor线程池中执行,这与我第一篇文章的猜测是吻合的,那重点分析一下该任务的允许逻辑,对应在run方法中。
   代码@1:获取消息发送者的TransactionListener。
   代码@2:执行TransactionListener#checkLocalTransaction,检测本地事务状态,也就是应用程序需要实现TransactionListener#checkLocalTransaction,告知RocketMQ该事务的事务状态,然后返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一个,然后向Broker发送END_TRANSACTION命令即可,
代码@3:发送END_TRANSACTION到Broker

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

消息服务器Broker提交回滚事务实现原理

本文将重点分析RocketMQ Broker如何处理事务消息提交、回滚命令,其核心实现就是根据commitlogOffset找到消息,如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理;回滚消息与提交事务消息不同的是,提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中。

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

RocketMQ事务消息实战

 我们以一个订单流转流程来举例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景,我们通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常为:
   1、A系统创建订单并入库。
   2、发送消息到MQ。
   3、MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。

方案弊端:
   1、如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间数据不一致。
   2、由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要去验证订单是否存在时可能出现订单不存在。
   3、消息发送可以考虑异步发送。
   方案二:
   由于存在上述问题,在MQ不支持事务消息的前提条件下,可以采用下面的方式进行优化。

然后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。
   然后通过定时任务,扫描待发送,结合创建时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
   方案弊端:
   1、消息有可能重复发送,但在消费端可以通过唯一业务编号来进行去重设计。
   2、实现过于复杂,为了避免 极端情况下的消息丢失,需要使用定时任务。
   方案三:基于RocketMQ4.3版本事务消息
TransactionListener 实现要点:
   executeLocalTransaction:
   该方法,主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。
   故在这里,主要是t_message_transaction添加一条记录,在事务会查时,如果存在记录,就认为是该消息需要提交。
   checkLocalTransaction:
   该方法主要是告知RocketMQ消息是否需要提交还是回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交,如果不存在,可以设置会查次数,如果指定次数内还是未查到消息,则回滚,否则返回未知,rocketmq会按一定的频率回查事务,当然回查次数也有限制,默认为5次,可配置。

RocketMQ 源码分析 14 事务消息02相关推荐

  1. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  2. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  3. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  4. RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

    RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache ...

  5. RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

    本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationRe ...

  6. rocketmq源码分析 -生产者

    概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...

  7. RocketMQ源码分析之request-reply特性

    1.什么是request-reply?   RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...

  8. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  9. mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析

    在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...

  10. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

最新文章

  1. 零基础入门学习Python(15)-序列
  2. 删除一个数的K位使原数变得最小
  3. mysql数据源找不到_mysql报出找不到目标数据源为啥? 同一个方法在其他地方调用正常...
  4. Python正则表达式如何进行字符串替换实例
  5. CF1039D-You Are Given a Tree【根号分治,贪心】
  6. 在mojoportal项目中发邮件使用的是dotnetopenmail
  7. 一加桌面3.0 android8,一加手机XRemix6.0安卓8.1.0Beta2.0定制本地化增强适配归属农历等...
  8. 【MySQL】MySQL 8报错 Unknown initial character set index 255 received from server. Initial client char
  9. C语言丨小 学 数 学(二):高精度乘法
  10. 原来歌这样唱也很好听
  11. PHP不仅仅是PHP
  12. [渝粤教育] 西南科技大学 土木工程材料 在线考试复习资料
  13. 如何在Linux上使用HAProxy配置HTTP负载均衡系统
  14. IEEE1588v2解析(2)--PTP协议概述
  15. [源码解读]position_estimator_inav_main解读(如何启动光流)
  16. 小呆聚合支付系统免签约即时到账多商户支付系统v6.5+监控软件+教程
  17. html语言 图片大小,html如何设置图片大小
  18. 为什么公司要和商标同步注册?
  19. HC-SR501红外人体传感模块封锁时间调整
  20. 软文推广丨什么是软文推广?

热门文章

  1. 2020论文阅读:Few-Shot Object Detection with Attention-RPN and Multi-Relation Detector
  2. 本文介绍在Wireshark网络协议分析仪中如果解密SSL和TLS流量
  3. 关于DOS/DDOS攻击和防御
  4. 按键精灵找文字的基础代码模板
  5. 若w=1,x=2,y=3,z=4,则条件表达式wx?w : zy?z : x的结果为 4
  6. Manjaro安装以及美化教程
  7. 《剑指offer》之知识汇总
  8. 十大热门编程语言入门难度排名
  9. 化繁为简 “云桥OneBridge”让IT运维事半功倍
  10. 华为安装gsm框架_华为谷歌框架安装app下载-华为谷歌服务框架安装器(GMS安装器)下载v1.2.0 最新版-西西软件下载...