概念介绍

  • 事务消息:提供类似XA或Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。
  • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了RocketMQ服务端,但是RocketMQ服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

分布式事务消息的优势

RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

典型场景

在淘宝购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅交易订单消息,做相应的业务处理,即可保证最终的数据一致性。

交互流程

事务消息交互流程如下图所示。

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

示例代码

事务消息生产者

public enum LocalTransactionState {COMMIT_MESSAGE,ROLLBACK_MESSAGE,UNKNOW,
}

事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:

  • COMMIT_MESSAGE:提交事务,允许消费者消费该消息。
  • ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
  • UNKNOW:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端根据回查规则向生产者进行消息回查。

创建事务消息的Producer时必须指定TransactionListener的实现类,处理异常情况下事务消息的回查。

回查规则:本地事务执行完成后,若服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。

package com.morris.rocketmq.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 事务消息生产者*/
public class TransactionProducer {public static void main(String[] args) throws Exception {TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-demo");producer.setNamesrvAddr(NAME_SERVER_ADDRESS);ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);// 指定事务会查的实现类producer.setTransactionListener(new TransactionListener() {private final AtomicInteger transactionIndex = new AtomicInteger(0);private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();System.out.println(Thread.currentThread().getName()+  "-executeLocalTransaction:" + new String(msg.getBody()) + ",value=" + value);int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(Thread.currentThread().getName()+  "-checkLocalTransaction:" + new String(msg.getBody()));Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.COMMIT_MESSAGE;case 1:return LocalTransactionState.UNKNOW;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}});producer.start();for(int i = 0; i < 10; i++) {Message message = new Message("TransactionTopic", ("transactionDemo" + i).getBytes());// 发送事务消息producer.sendMessageInTransaction(message, i);System.out.println(message);}}}

第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

设置方式如下:

Message message = new Message();
message.putUserProperties(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");

说明:因为系统默认的回查间隔,第一次消息回查的实际时间会向后有0秒~30秒的浮动。

例如:指定消息的第一次消息最快回查时间设置为60秒,系统在第58秒时达到定时的回查时间,但设置的60秒未到,所以该消息不在本次回查范围内。等待间隔30秒后,下一次的系统回查时间在第88秒,该消息才符合条件进行第一次回查,距设置的最快回查时间延后了28秒。

事务消息消费者

事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据Group ID去查询生产者客户端。

package com.morris.rocketmq.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 事务消息消费者*/
public class TranscationConsumer {public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group");// 指定Namesrv地址信息.consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);// 订阅Topicconsumer.subscribe("TransactionTopic", "*");//负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING);// 注册回调函数,处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//启动消息者consumer.start();System.out.printf("Consumer Started.%n");}
}

使用说明

  1. 事务消息不支持延时消息和批量消息。
  2. 事务回查的间隔时间:BrokerConfig.transactionCheckInterval,通过Broker的配置文件设置好。
  3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
  4. 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
  5. 事务性消息可能不止一次被检查或消费。
  6. 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
  7. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  8. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。

【RocketMQ】发送事务消息相关推荐

  1. rocketMq发送事务消息

    事务消费 我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了. 上述场景在各个类 ...

  2. RocketMQ(九):rocketMQ设计的全链路消息零丢失方案?+rocketmq消息中间件事务消息机制的底层实现原理?+half是什么?+half消息是如何对消费者不可见的?

    前言: 目前rocketmq更新已经更新了11篇博客了,预计接下来的2-3篇是暂时的更新进度了,准备更新一下springboot或者是jvm,mysql相关的专题出来,后续更新完事后,再分享一些实战性 ...

  3. RocketMQ之事务消息

    本文来说下Rocket的消息事务 文章目录 概述 事务消息实现原理 本文小结 概述 在电商系统上线初期,往往会进行一些"拉新"活动,例如活动部门提出新用户注册送积分.送优惠券活动. ...

  4. rocketmq发送顺序消息(四)

    rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...

  5. RocketMq发送延迟消息

    什么是延迟消息? 对于消息中间件来说,producer将消息发送到mq的服务器,但并不期望这条消息马上被消费,而是推迟到当前时间点之后的某个时间点后再投递到queue中让consumer进行消费,延迟 ...

  6. RocketMQ 分布式事务消息过程分析

    生产环境最小单元高可用部署模式: 如果192.168.3.100的broker假死,那么3.110,3.111的nameserver都能在2分钟内感知broker-a宕机,然后客户端能成功从names ...

  7. RocketMQ发送延迟消息时报错,发送同步消息却正常

    延时消息代码如下: Message<Mdds> message = MessageBuilder.withPayload(mdds).build(); // 延迟第3级发送(延迟10秒) ...

  8. RocketMQ5.0.0事务消息

    目录 一.事务消息概览 二.事务消息实现机制 1. 事务消息发送流程 1):发送事务消息类图 2):生产端发送事务消息 3):Broker存储事务消息 2. 生产者提交或回滚事务消息 1):生产者发送 ...

  9. 事务消息大揭秘!RocketMQ、Kafka、Pulsar全方位对比

    导语 | 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka和Pulsar都是当今业界应用十分广泛的开源消息队列(MQ)组件,笔者在工作中遇到关于M ...

  10. 实战分析 RocketMQ事务消息

    众所周知,在分布式领域有两大经典理论:CAP 和 BASE.一般情况下,我们将CAP中的数据一致性称为"强一致性",将BASE中的数据一致性称为"最终一致性". ...

最新文章

  1. 我对软件行业及大数据的理解
  2. Work Management Service application in SharePoint 2016
  3. Spring JDBC-自增键和行集RowSet
  4. Linux移植之内核启动过程引导阶段分析
  5. posman使用教程
  6. 字符串格式化成时间格式_JAVA | 常用的日期/时间格式化方式
  7. Java快速入门学习笔记7 | Java语言中的类与对象
  8. android 调用 asp.net web api,从 .NET 客户端调用 Web API (C#)
  9. Java Script基础(七) HTML DOM模型
  10. c语言读写txt坐标文件数据,C语言——从txt文件中读写数据
  11. vue和echarts实现地图航线
  12. echarts横向柱状图
  13. 笔记本打印时出现打印机出现异常配置问题_笔记本电脑连接共享打印机出现错误怎么办...
  14. 无约束一维极值——黄金分割法
  15. python怎么交换xy轴_matplotlib Y轴和X轴交换
  16. [LiteratureReview]Improving 3D Object Detection for Pedestrians with Virtual Multi-View Synthesis...
  17. python与CAD——磨平了棱角的多边形
  18. 安卓手机怎么修改图片分辨率?手机怎么提高图片分辨率?
  19. mysql保留小数位数函数
  20. UNCTF2021 部分WP

热门文章

  1. ML之shap:基于adult人口普查收入二分类预测数据集(预测年收入是否超过50k)利用shap决策图结合LightGBM模型实现异常值检测案例之详细攻略
  2. bzoj4567【SCOI2016】背单词
  3. 如何用计算机测量图片景深,用比较仔细的测量搞清楚“景深”(1.实测景深与公式比较)...
  4. 基于OAS设计可扩展OpenAPI
  5. 漫画:头条面试官谈自我介绍
  6. 颈椎病自我治疗预防颈椎病
  7. 个人博客网站编写(01)
  8. 12-搜索前端开发-按分类搜索
  9. Linux 运维是做什么的
  10. LBT(CCA) in LAA/Multefire (二)