在生产者producer当中,通过sendMessageInTransaction()方法来发送事务消息,但是在一开始向Broker发送的事务消息的时候,具体的事务操作还并没有进行处理,而是相当于向Broker发送了一条预处理消息。在sendMessageInTransaction()的参数中,除了需要发送的事务消息之外,还需要实现了sendMessageInTransaction接口的类作为参数之一,在executeLocalTransactionBranch()方法中实现具体的事务操作。

可以具体来看在DefaultMQProducerImpl实现的sendMessageInTransaction()方法。

public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {if (null == tranExecuter) {throw new MQClientException("tranExecutor is null", null);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,this.defaultMQProducer.getProducerGroup());try {sendResult = this.send(msg);}catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__",sendResult.getTransactionId());}localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}}catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {this.endTransaction(sendResult, localTransactionState, localException);}catch (Exception e) {log.warn("local transaction execute " + localTransactionState+ ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
}

可以看到,在第一次发送消息之前,会给消息的事务属性设置为transactuion_prepared属性,然后的第一次发送逻辑还是与普通的非事务消息没有区别。

第一次的发送只是给事务消息设置了transactuion_prepared属性,可以把目光转到Broker看到对于接收到这一属性消息的处理。

在具体的Broker消息存储中,在消息存储到所有消息消费队列公用的存储文件队列commitLog完毕之前并没有区别,但是在之后消息在commitLog上具体存储偏移量的分发给消息消费队列发生区别。

在创建消息消费队列的过程中的doDispatch()方法,可以看到这段代码。

final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
// 1、分发消息位置信息到ConsumeQueue
switch (tranType) {
case MessageSysFlag.TransactionNotType:
case MessageSysFlag.TransactionCommitType:// 将请求发到具体的Consume QueueDefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(),req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(),req.getStoreTimestamp(), req.getConsumeQueueOffset());break;
case MessageSysFlag.TransactionPreparedType:
case MessageSysFlag.TransactionRollbackType:break;
}

也就是说,处于preparedType的事务消息并不会将消息的具体存储位置分发到消费队列,也就是说该消息并不会被消费者通消费队列以及消费队列上的偏移量取到并消费。

同理接下来的索引创建也不会将事务第一阶段提交的prepared状态的事务消息创建索引。

接下来,可以继续回到之前producer对于事务消息的发送过程。

如果成功接收到了第一次发送的事务消息发送成功的消息,那么将会在作为参数传入的sendMessageInTransaction的executeLocalTransactionBranch()方法当中执行具体的事务消息操作。在结束具体的事务操作之后将会将通过endTransaction()方法将第二次消息提交给Broker当中。

private void endTransaction(//final SendResult sendResult, //final LocalTransactionState localTransactionState, //final Throwable localException) throws RemotingException, MQBrokerException,InterruptedException, UnknownHostException {final MessageId id = MessageDecoder.decodeMessageId(sendResult.getMsgId());String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark =localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());
}

在第二次消息的发送当中,将会把上一次的消息发送的commitLogOffset位置作为参数,根据具体事务处理的结果给消息的事务属性赋为commit或者rollback发送。

可以转到Broker看到针对第二次事务消息发送的处理。

在第二次消息的处理时,会在EndTrandsactionProcessor当中对于第二次消息进行处理。方法很长,可以看几处重点。

final MessageExt msgExt =this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());

首先会根据第一次提交的消息的commitLogOffset取得存储在commitLog上第一次消息提交的实体。

MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {msgInner.setBody(null);
}final MessageStore messageStore = this.brokerController.getMessageStore();
final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);

之后会根据第一次提交的消息,构造一个内容一模一样的MessageExtBrokerInner的消息实体,如果是回滚的消息类型,将会给存储消息正文的body赋为null,之后将会重新通过消息存储层DefaultMessageStore的putMessage()方法重新进行消息的存储。

由于事务的状态属性已经为commit,将会正常存入commitLog以及分发给消费队列并创建索引,消费者可以正常消费。

对于事务消息的回查,RocketMQ并没有开源。

RocketMQ源码解析-事务消息的二阶段提交相关推荐

  1. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  2. RocketMQ 源码分析 事务消息

    为什么80%的码农都做不了架构师?>>>    1. 概述 必须必须必须 前置阅读内容: <事务消息(阿里云)> 2. 事务消息发送 2.1 Producer 发送事务消 ...

  3. RocketMQ源码解析-Producer消息发送

    首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...

  4. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  5. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  8. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  9. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

最新文章

  1. [入门向选讲] 插头DP:从零概念到入门 (例题:HDU1693 COGS1283 BZOJ2310 BZOJ2331)
  2. 四轴飞行器实践教程1.1.2飞行器的发展
  3. IQmath中文手册
  4. 将列表转成数组_漫画 | 什么是散列表(哈希表)?
  5. hdu 1087 最大递增和
  6. linux 魔术分区,Parted Magic-Linux 中的分区魔术师
  7. 线程安全使用相关注意事项
  8. shell下利用运算方式编写倒计时脚本
  9. 【upc】生命曲线(线段树) —— 一个能让你搞懂线段树懒标的题目
  10. 云计算是什么通俗解释_什么是云? 解释
  11. C++ #include expects “FILENAME“ or <FILENAME>
  12. 获取中国 省市区 js
  13. 投资理财(待更新,仅供个人使用)
  14. 磐石计划(二)——虚拟机win7安装KB4474419和 KB4490628补丁
  15. 洛谷 P1162 填涂颜色 C++ 深搜 染色法
  16. 第十六届智能车竞赛全国总决赛究竟该怎么举办讨论中的“混沌”现象
  17. android 打开屏幕,Android打开屏幕
  18. 【数据库】数据库管理系统(Database Management Systems)
  19. linux shell 看门狗,Linux 下如何使用看门狗
  20. php搞笑证件,怎么制作搞笑证件 网络搞笑证件制作的软件怎么用的

热门文章

  1. Python matplotlib画图出现No handles with labels found to put in legend
  2. ProtoBuf3语法指南(Protocol Buffers)_下
  3. idea关闭coverage
  4. Hyperledger Fabric 实战(七):链码 shim API 详解
  5. HDoj-1863-畅通project-并查集
  6. [中国剩余定理]【学习笔记】
  7. oracle job有定时执行的功能,可以在指定的时间点或每天的某个时间点自行执行任务。...
  8. Json和object相互转化,排除json中多余的字段
  9. FCS省选模拟赛 Day7
  10. .net MvcPager+Ajax无刷新分页