目录

一、事务消息概览

二、事务消息实现机制

1. 事务消息发送流程

1):发送事务消息类图

2):生产端发送事务消息

3):Broker存储事务消息

2. 生产者提交或回滚事务消息

1):生产者发送提交或回滚事务请求

2):Broker处理生产者请求

3. Broker回查事务状态

1):Broker执行回查任务

2):线程池执行异步回查

三、事务消息实例

1. 事务消息监听类

2. 事务消息生产者

3. 事务消息消费者

4. 业务代码

四、参考资料


一、事务消息概览

RocketMQ事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,消费者可以消费事务提交的消息,如下图所示。事务消息的作用:确保本地业务与消息在一个事务内,本地业务成功执行,消费者才能消费消息

事务消息实现流程

事务消息实现流程分3个部分:

  • 应用程序:事务内完成相关业务数据入库后,需要同步调用RocketMQ消息发送接口,发送状态为prepare消息。消息发送成功后,RocketMQ服务器会回调RocketMQ消息发送者的事件监听器,记录消息的本地事务状态,事务状态与本地业务操作同属一个事务,确保消息发送与本地事务的原子性
  • Broker服务器存储消息:存储器Broker收到类型为prepare的消息时,会首先备份消息的原主题与原消息消费队列到消息扩展属性中,然后修改消息存储主题RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
  • Broker定时任务:存储器Broker开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息发送端(应用程序)发起消息事务状态回查,应用程序根据保存的事务状态反馈消息服务器事务的状态(提交、回滚、未知),如果是提交或回滚,则消息服务器提交或回滚消息,如果是未知,待下一次回查, RocketMQ 允许设置一条消息的回查间隔与回查次数,如果超过回查次数后依然无法获知消息的事务状态,则默认回滚消息

二、事务消息实现机制

1. 事务消息发送流程

1):发送事务消息类图

org.apache.rocketmq.client.producer.TransactionMQProducer事务消息生产者类,其UML类图如下。

org.apache.rocketmq.client.producer.TransactionListener是事务监听器接口,用户必须自定义事务监听器实现类复写两个方法:

  • TransactionListener#executeLocalTransaction():执行本地事务(如:保存事务消息ID),与业务逻辑在一个事务内,供状态回查使用。
  • TransactionListener#checkLocalTransaction():回查本地事务状态,即:事务是否提交或回滚或未知,若是未知,则继续下次查询。

下图是发送事务消息总流程图,下面小节详细介绍消息的发送过程。

2):生产端发送事务消息

org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction是发送事务消息的入口方法,其调用链如下。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction是发送事务消息的核心方法,其代码如下。注意事项:

  • 设置消息为prepare:为后续发送消息前改变消息topic,而原topic和消费队列添加到消息扩展属性中;
  • 设置消息生产组:为后续Broker定时查询事务状态时,从生产组中随机选择一个即可;
  • 调用结束事务endTransaction():调用该方法时,本地业务逻辑及事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成,才定期回查事务状态
/*** 发送事务消息核心逻辑* step1:获取生产者的事务监听器,自定义实现类,接口是{@link TransactionListener}* step2:检查消息延迟级别,若是事务消息则忽略延迟参数* step3:检查消息是否符合规则,如:topic规范、消息体不能为空、消息体长度默认不能大于4MB* step4:设置消息为prepare消息 + 设置消息生产组(目的:监听器查询事务状态时,从生产组中随机选择一个即可)* step5:发送事务(与普通消息发送流程相同)并返回结果*        {@link DefaultMQProducerImpl#send(Message)}* step6:消息发送成功后,获取事务ID和记录本地事务状态(保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据)* step7:结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查*        {@link DefaultMQProducerImpl#endTransaction()}* @param msg 事务消息内容* @param localTransactionExecuter 本地事务Executer* @param arg 传入事务监听器的参数* @return*/
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 获取生产者的事务监听器TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameter 忽略延迟消息参数if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}// 检查消息是否符合规则Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 设置消息为prepare消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 设置消息生产组,目的:监听器查询事务状态时,从生产组中随机选择一个即可MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 发送事务(与普通消息发送流程相同)sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}// 根据消息发送结果,作响应的本地事务状态处理LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {/*消息发送成功,获取事务ID和记录本地事务状态注意:保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据*/case SEND_OK: {try {// 从消息发送结果获取事务IDif (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}// 记录本地事务状态if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// 结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl发送消息,若是prepare消息时,添加事务消息标识到系统标识中。

/*** 发送消息* step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;* step2:不是批量发送消息,则为消息分配一个全局消息ID;* step3:设置消息是否压缩,消息体 > 4KB,则压缩;* step4:是否是事务Prepare消息,若是写入事务prepare标签;* step5:执行发送前钩子函数;* step6:根据不同发送方式,发送消息;* step7:执行发送后钩子函数;*/
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {......// 是否是事务Prepare消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签}       ......
}

3):Broker存储事务消息

普通消息发送与Broker存储消息参考资料  《RocketMQ5.0.0消息发送》、《RocketMQ5.0.0消息存储<二>_消息存储流程》。org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage该方法是普通消息的处理,解析系统标识中是否有事务消息标签,若是:

  • 判断Broker是否禁止事务消息存储
  • 事务消息时,执行事务消息存储逻辑:异步TransactionalMessageService#asyncPrepareMessage、同步TransactionalMessageService#prepareMessage
/*** 存储之前,对消息的处理* step1:预发送处理,如:检查消息、主题是否符合规范* step2:发送消息时,是否指定消费队列,若没有则随机选择* step3:消息是否进入重试或延迟队列中(重试次数失败)* step4:消息是否是事务消息,若是则存储为prepare消息* step5:BrokerConfig#asyncSendEnable是否开启异步存储,默认开启true*        (异步存储、同步存储)*/
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader,final TopicQueueMappingContext mappingContext,final SendMessageCallback sendMessageCallback) throws RemotingCommandException {......// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());// 事务标签String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);boolean sendTransactionPrepareMessage = false;if (Boolean.parseBoolean(traFlag)&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1// Broker禁止事务消息存储if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}sendTransactionPrepareMessage = true;}long beginTimeMillis = this.brokerController.getMessageStore().now();// 消息是否异步存储if (brokerController.getBrokerConfig().isAsyncSendEnable()) {CompletableFuture<PutMessageResult> asyncPutMessageFuture;if (sendTransactionPrepareMessage) { // 事务prepare操作asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}final int finalQueueIdInt = queueIdInt;final MessageExtBrokerInner finalMsgInner = msgInner;asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {RemotingCommand responseFuture =handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,ctx, finalQueueIdInt, beginTimeMillis, mappingContext);if (responseFuture != null) {doResponse(ctx, request, responseFuture);}sendMessageCallback.onComplete(sendMessageContext, response);}, this.brokerController.getPutMessageFutureExecutor());// Returns null to release the send message threadreturn null;}// 消息同步存储else {PutMessageResult putMessageResult = null;// 事务消息存储if (sendTransactionPrepareMessage) {putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);} else {// 同步存储消息putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);sendMessageCallback.onComplete(sendMessageContext, response);return response;}
}

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner该方法是事务消息与普通消息的主要区别。将事务消息原主题、消息队列ID放入消息扩展属性中,然后将消息topic变更为RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID为0之后再存储消息,目的是:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费。如下代码所示。

/*** 这里是事务消息与普通消息的主要区别:* 将事务消息的原始topic、消息队列ID 存放到消息属性中,* 并更改为:topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0* 所以:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费*/
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 消息的原始topic、消息队列ID 存放到消息属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));// topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;
}

2. 生产者提交或回滚事务消息

1):生产者发送提交或回滚事务请求

根据上小节发送事务消息的核心方法DefaultMQProducerImpl#sendMessageInTransaction看出,生产者会调用DefaultMQProducerImpl#endTransaction来结束消息事务。原因是本地业务逻辑及本地事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成(UNKNOW),才定期回查事务状态。如下代码所示。

发送消息时,默认本地事务状态为未知(UNKNOW),然后发送消息成功后,执行监听器的executeLocalTransaction()方法并返回本地事务状态。当生产者发送请求来结束事务消息时,本地事务状态可能处于提交(COMMIT_MESSAGE)、回滚(ROLLBACK_MESSAGE)、未知(UNKNOW)

/*** 生产者根据本地事务状态触发Broker的消息提交或回滚* 注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查;而提交或回滚时,事务消息从prepare阶段到提交或回滚阶段*         prepare阶段:topic为:RMQ_SYS_TRANS_HALF_TOPIC*         已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC* Broker端处理请求入口:{@link org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest}*/
public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {// 消息IDfinal MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}// 事务IDString transactionId = sendResult.getTransactionId();// 根据发送结果返回的消息队列ID(修改后的消息队列ID为0)的broker名称final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));// 获取Broker地址final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);// 组装结束事务请求头EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());// 根据本地事务状态,设置请求头事务状态switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}// 结束前调用钩子函数doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 单向发送结束事务请求到目的Brokerthis.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());
}

2):Broker处理生产者请求

org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest是Broker处理结束事务消息请求的核心方法,其代码如下,注意事项:

  • 本地事务状态为UNKNOW时,Broker不做处理,后续定时任务回查事务状态来结束事务
  • 事务提交或回滚的区别联系:

        区别:提交状态时,还原原始消息并提交到Commitlog文件内存映射,后删除prepare消息;

                   回滚状态时,直接删除prepare消息;

        联系:提交、回滚状态时都要删除prepare消息(删除表示该事务消息已处理),删除不是物理删除,而是prepare阶段的topic为RMQ_SYS_TRANS_HALF_TOPIC,修改为RMQ_SYS_TRANS_OP_HALF_TOPIC

/*** 处理生产者结束本地事务请求* 生产者结束本地事务总入口:{@link DefaultMQProducerImpl#endTransaction}* step1:判断Broker为从服务器,则直接返回* step2:提交:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}*            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}*            还原原始消息:{@link EndTransactionProcessor#endMessageTransaction}*            存储到Commitlog,供消费者消费:{@link EndTransactionProcessor#endMessageTransaction}*            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}*       回滚:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}*            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}*            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}* 注意:*  a.提交或回滚的区别:是否还原原始消息,并存储到Commitlog文件 供消费者消费*  b.prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC*    已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);// Broker为从服务器,则直接返回if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 事务检查,fromTransactionCheck默认falseif (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {// 事务类型(依据是生产者本地事务状态)switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 生产者本地事务提交if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 根据偏移量,获取事务消息(prepare阶段)result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);// 查找prepare事务消息成功if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查prepare事务消息RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 还原原始消息(prepare)MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 原始消息,存储到Commitlog文件,供消费者消费RemotingCommand sendResult = sendFinalMessage(msgInner);// 存储成功后,删除prepare阶段的消息// prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPICif (sendResult.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}}// 生产者本地事务回滚else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 根据偏移量,获取事务消息(prepare阶段)result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);// 删除prepare阶段的消息// prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPICif (res.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;
}

3. Broker回查事务状态

根据上两小节介绍,本地事务状态为UNKNOW时,生产者结束本地事务请求时,Broker不做任何处理,而是Broker定时任务回查事务状态。TransactionalMessageCheckService线程默认1min定时周期检测topic为RMQ_SYS_TRANS_HALF_TOPIC消息,回查事务状态。broker.conf配置transactionChecklnterval来改变默认值,单位为毫秒。如下所示是Broker事务消息UML图。

下图所示是事务消息回查状态流程图。

1):Broker执行回查任务

org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run执行检查任务,每1min执行周期,调用onWaitEnd()方法。如下代码所示,其中两个重要参数:

  • transactionTimeOut:事务过期时间,默认6s。只有当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查,否则待下次查询。
  • transactionCheckMax:回查状态最大次数,默认15。大于该值则不会继续回查,丢弃(相当于回滚)。
/*** 默认1min检查一次事务状态,事务到期后,调用生产者的事务状态*/
@Override
public void run() {log.info("Start transaction check service thread!");while (!this.isStopped()) {// 检查时间间隔,默认1min,通过broker.conf配置transactionCheckIntervallong checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();// 等待1min,执行onWaitEnd()方法this.waitForRunning(checkInterval);}log.info("End transaction check service thread!");
}@Override
protected void onWaitEnd() {// 事务过期时间(默认6s):当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();// 事务回查最大次数(默认15),大于该值则不会继续回查,丢弃(相当于回滚)int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);// 事务状态回查实现逻辑this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check是状态回查的核心逻辑方法,如下代码所示。注意事项:

  • fillOpRemoveMap()方法目的: 根据当前的处理进度(opOffset)依次从已处理队列拉取32条消息,方便判断当前处理的消息是否已经处理过,如果处理过则无须再次发送事务状态回查请求,避免重复发送事务回查请求(异步回查状态)
  • 判断是否需要发送事务状态回查isNeedCheck:

a. 已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间;

b. 已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间。

  • putBackHalfMsgQueue()方法目的:发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件。原因是:

                a. 发送事务状态回查是异步请求,无法立刻知道处理结果;

                b. MQ是顺序写,已经修改检查次数的当前消息,修改已存储的消息,性能无法保证

  • MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS:消息事务消息回查请求的最晚时间,单位为秒,指的是程序发送事务消息时,可以用户指定该事务消息的有效时间,只有在这个时间内收到回查消息才有效,默认为null
/*** 事务状态回查实现逻辑* step1:根据prepare阶段的事务消息topic,获取消息队列集合并遍历* step2:根据消息队列messageQueue,获取已处理队列opQueue* step3:根据当前处理进度 从已处理的消费队列 拉取32条消息*        {@link TransactionalMessageServiceImpl#fillOpRemoveMap}* step4:判定一个周期是否超出执行最长总时间、消息已被处理* step5:获取prepare阶段的待处理消息*        needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)*        needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间* step6:判定prepare消息是否在用户指定有效时间内或是否超出事务超时时间* step7:判断是否需要发送事务状态回查*        a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间*        b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间* step7:需要回查,先存储新的消息到prepare,再回查状态*        {@link AbstractTransactionalMessageCheckListener#resolveHalfMsg}* step8:不需要回查,继续step3* step9:更新prepare阶段事务消息队列的回查进度*        更新已处理消息队列的进度*/
@Override
public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {try {// prepare阶段的事务消息topicString topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;// 获取对应的消费队列集合Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) {log.warn("The queue of topic is empty :" + topic);return;}log.debug("Check topic={}, queues={}", topic, msgQueues);for (MessageQueue messageQueue : msgQueues) {long startTime = System.currentTimeMillis();// 根据prepare阶段消费队列获取已处理的消费队列MessageQueue opQueue = getOpQueue(messageQueue);// prepare阶段消费队列的处理偏移量long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);// 已处理的消费队列的处理偏移量long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);if (halfOffset < 0 || opOffset < 0) {log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,halfOffset, opOffset);continue;}// 已处理消息偏移量集合List<Long/* opOffset */> doneOpOffset = new ArrayList<>();// halfOffset 与 opOffset映射关系HashMap<Long/* halfOffset */, Long/* opOffset */> removeMap = new HashMap<>();// 根据opOffset 从已处理的消费队列 往后拉取32条消息PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);if (null == pullResult) {log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",messageQueue, halfOffset, opOffset);continue;}// single threadint getMessageNullCount = 1; // 空消息次数long newOffset = halfOffset; // 当前处理prepare阶段消费队列的最新进度long i = halfOffset;         // 当前处理prepare阶段消费队列的队列偏移量while (true) {// 周期内的执行最长总时间(默认60s),超出时则跳出,待一下周期执行if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);break;}// 消息已被处理if (removeMap.containsKey(i)) {log.debug("Half offset {} has been committed/rolled back", i);Long removedOpOffset = removeMap.remove(i);doneOpOffset.add(removedOpOffset);}// 消息未被处理else {// 获取prepare阶段的待处理消息GetResult getResult = getHalfMsg(messageQueue, i);MessageExt msgExt = getResult.getMsg();// 消息为nullif (msgExt == null) {// 空消息次数 > 最大空消息重试次数if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {break;}// 没有新的消息,导致消息为nullif (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,messageQueue, getMessageNullCount, getResult.getPullResult());break;}// 其他异常时,设置参数重新拉取else {log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",i, messageQueue, getMessageNullCount, getResult.getPullResult());i = getResult.getPullResult().getNextBeginOffset();newOffset = i;continue;}}// needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)// needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {listener.resolveDiscardMsg(msgExt);newOffset = i + 1;i++;continue;}// 存储时间 > 当前开始检测时间if (msgExt.getStoreTimestamp() >= startTime) {log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,new Date(msgExt.getStoreTimestamp()));break;}// prepare事务消息已存储的时间long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();// 立即检查事务状态的时间,默认值MQ内设置事务超时时间transactionTimeout,则回查事务状态long checkImmunityTime = transactionTimeout;// 用户指定回查事务状态的最晚时间,默认为null(超出时,事务回滚,即:查询状态必须在最晚时间内有效)String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);if (null != checkImmunityTimeStr) { // 用户设置// 获取回查事务状态时间checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);// 已存储的时间 在 立即回查事务状态时间 范围内,则说明用户指定查询状态在最晚时间内if (valueOfCurrentMinusBorn < checkImmunityTime) {// 发送事务状态回查if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {newOffset = i + 1;i++;continue;}}}// 用户未设置,采用MQ内置transactionTimeout事务超时时间else {// 已存储的时间 不到事务超时时间,则下一次查询if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,checkImmunityTime, new Date(msgExt.getBornTimestamp()));break;}}// 获取已处理的32条消息List<MessageExt> opMsg = pullResult.getMsgFoundList();/** 判断是否需要发送事务状态回查* a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间* b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间*/boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime|| opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout|| valueOfCurrentMinusBorn <= -1;// 需要回查if (isNeedCheck) {// 发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件if (!putBackHalfMsgQueue(msgExt, i)) { // 存储失败,则继续continue;}// 存储成功,异步发送回查事务状态请求listener.resolveHalfMsg(msgExt);}// 不需要回查else {// 继续拉取已处理的消息pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,messageQueue, pullResult);continue;}}newOffset = i + 1;i++;}// 更新prepare阶段事务消息队列的回查进度if (newOffset != halfOffset) {transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}// 更新已处理消息队列的进度long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}}} catch (Throwable e) {log.error("Check error", e);}}

代码看出(下小节讲述),使用线程池来异步发送回查消息,为了回查消息进度保存的简化,只要发送了回查消息,当前回查进度会向前推动。如果回查失败,回查前重新增加的prepare消息将可以再次发送回查消息;那如果回查消息发送成功,会不会下一次又重复发送回查消息呢?根据已处理队列中的消息来判断是否重复,如果回查消息发送成功并且Broker完成提交或回滚操作,该消息会发送到已处理队列中(删除的prepare消息),然后首先会通过fillOpRemoveMap()根据处理进度获取一批已处理的消息,来与消息判断是否重复。由于fillOpRemoveMap()一次拉取32条消息, 那又如何保证一定能拉取到与当前消息的处理记录呢?其实就是isNeedCheck为false时,如果此批消息最后一条未超过事务延迟消息,则继续拉取更多消息进行判断。

2):线程池执行异步回查

org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg是执行事务状态回查入口方法,回查任务添加到线程池异步执行回查状态。sendCheckMessage()发送事务回查状态的核心方法,如下代码。

// 异步发送回查事务状态请求
public void resolveHalfMsg(final MessageExt msgExt) {if (executorService != null) {executorService.execute(new Runnable() {@Overridepublic void run() {try {// prepare事务消息回查事务状态sendCheckMessage(msgExt);} catch (Exception e) {LOGGER.error("Send check message error!", e);}}});} else {LOGGER.error("TransactionalMessageCheckListener not init");}
}/*** prepare事务消息回查事务状态* 生产者接受请求的总入口:{@link ClientRemotingProcessor#processRequest}* @param msgExt prepare事务消息*/
public void sendCheckMessage(MessageExt msgExt) throws Exception {// 组装事务状态回查请求头CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());// prepare事务消息还原为原始消息msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgExt.setStoreSize(0);// 获取生产组String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);// 从生产组内获取一个生产者通道Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);if (channel != null) {// 发送回查请求brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);} else {LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);}
}

三、事务消息实例

1. 事务消息监听类

实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:

  • executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态。
  • checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息。
package com.common.instance.demo.config.rocketmq;import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;
import java.util.List;/*** @description 订单事务消息监听器实现类* @author TCM* @version 1.0* @date 2023/1/1 16:44**/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {@Resourceprivate TMessageTransactionService tMessageTransactionService;@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object arg) {// 组装事务TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);// 保存事务中间表tMessageTransactionService.insert(tMessageTransaction);// 推荐返回UNKNOW状态,待事务状态回查return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 获取用户属性tabIdString tabId = messageExt.getUserProperty("tabId");// 查询事务消息List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {return LocalTransactionState.COMMIT_MESSAGE;}LogUtil.error("orderTransaction rollBack, tabId: " + tabId);return LocalTransactionState.ROLLBACK_MESSAGE;}// 组装事务private TMessageTransaction packageTMessageTransaction(Message message) {TMessageTransaction tMessageTransaction = new TMessageTransaction();// 获取用户属性tabIdString tabId = message.getUserProperty("tabId");// 事务IDString transactionId = message.getTransactionId();tMessageTransaction.setTabId(tabId);tMessageTransaction.setTransactionId(transactionId);tMessageTransaction.setCreateBy("auto");tMessageTransaction.setCreateTime(new Date());return tMessageTransaction;}}

2. 事务消息生产者

package com.common.instance.demo.config.rocketmq;import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;/*** @description 订单事务消息生产者* @author TCM* @version 1.0* @date 2023/1/1 16:54**/
@Component
public class OrderTransactionProducer {@Resourceprivate OrderProducerProperties orderProducerProperties;@Resourceprivate OrderTransactionListenerImpl orderTransactionListener;private TransactionMQProducer orderTransactionMQProducer;@PostConstructpublic void start() {try {LogUtil.info("start rocketmq: order transactionProducer");orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());// 注册事务监听器orderTransactionMQProducer.setTransactionListener(orderTransactionListener);orderTransactionMQProducer.start();} catch (MQClientException e) {LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);}}public void sendTransactionMessage(WcPendantTab data) {sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);}public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {try {// 消息内容byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);// 消息对象Message message = new Message(topic, tags, keys, msgBody);message.putUserProperty("tabId", data.getTabId());// 发送事务消息orderTransactionMQProducer.sendMessageInTransaction(message, null);} catch (Exception e) {LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);}}@PreDestroypublic void stop() {if (orderTransactionMQProducer != null) {orderTransactionMQProducer.shutdown();}}}

3. 事务消息消费者

package com.common.instance.demo.config.rocketmq;import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;/*** @description 订单消费者* @author TCM* @version 1.0* @date 2023/1/1 14:29**/
@Component
public class OrderConsumer implements MessageListenerConcurrently {@Resourceprivate OrderConsumerProperties orderConsumerProperties;private DefaultMQPushConsumer orderMQConsumer;@PostConstructpublic void start() {try {LogUtil.info("start rocketmq: order consumer");orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));orderMQConsumer.registerMessageListener(this); // 注册监听器orderMQConsumer.start();} catch (MQClientException e) {LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);}}@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {int index = 0;try {for (; index < msgs.size(); index++) {// 完整消息MessageExt msg = msgs.get(index);// 消息内容String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);LogUtil.info("消费组消息内容:" + messageBody);}} catch (Exception e) {LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);} finally {if (index < msgs.size()) {// 消费应答context.setAckIndex(index + 1);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}@PreDestroypublic void stop() {if (orderMQConsumer != null) {orderMQConsumer.shutdown();}}}

4. 业务代码

package com.common.instance.demo.service.impl;import com.alibaba.fastjson.JSON;
import com.common.instance.demo.config.rocketmq.OrderProducer;
import com.common.instance.demo.config.rocketmq.OrderTransactionProducer;
import com.common.instance.demo.dao.WcPendantTabDao;
import com.common.instance.demo.entity.WcPendantTab;
import com.common.instance.demo.service.WcPendantTabService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.List;
import java.util.UUID;/**** @author tcm* @since 2021-01-14 15:02:08*/
@Service
public class WcPendantTabServiceImpl implements WcPendantTabService {@Transactional@Overridepublic void testTransactionMessage(WcPendantTab tab) {// 保存tabwcPendantTabDao.insert(tab);// 发送事务消息orderTransactionProducer.sendTransactionMessage(tab);Integer.valueOf("abc");}}

代码执行结果:事务回滚,事务消息发送后回查事务状态,则回滚事务消息,消费者消费不到消息。

java.lang.NumberFormatException: For input string: "abc"at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)at java.lang.Integer.parseInt(Integer.java:580)at java.lang.Integer.valueOf(Integer.java:766)at com.common.instance.demo.service.impl.WcPendantTabServiceImpl.testTransactionMessage(WcPendantTabServiceImpl.java:68)at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$FastClassBySpringCGLIB$$9543cf63.invoke(<generated>)at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367)at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$EnhancerBySpringCGLIB$$a7272112.testTransactionMessage(<generated>)at com.common.instance.demo.controller.WcPendantTabController.testTransactionMessage(WcPendantTabController.java:47)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at brave.servlet.TracingFilter.doFilter(TracingFilter.java:68)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at brave.servlet.TracingFilter.doFilter(TracingFilter.java:87)at org.springframework.cloud.sleuth.instrument.web.LazyTracingFilter.doFilter(TraceWebServletAutoConfiguration.java:139)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)at java.lang.Thread.run(Thread.java:748)2023-02-09 17:26:16.693 ERROR [,,,] 6316 --- [pool-1-thread-1] LOG_ERROR                                : 127.0.0.1||^orderTransaction rollBack, tabId: fde7c1d4cad049c89612afb6c2c29791

四、参考资料

RocketMQ 发送事务消息原理分析和代码实现_扎瓦江石的博客-CSDN博客

彻底看懂RocketMQ事务实现原理 - 知乎

RocketMQ5.0.0消息发送_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<二>_消息存储流程_rocketmq 消息写入流程_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0事务消息相关推荐

  1. 通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

    文章转载自公众号  心源意码 , 作者 寻筝 "得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席." 由阿里自研的Roc ...

  2. mysql事务线程id_为何出现了trx_mysql_thread_id为0 的事务

    今天巡检时突然发现有很多锁等待超时的情况,原以为是一个简单的小事,一查,结果令人深思. 1.  问题现象 发现日志中出现了大量的 ERROR 1205 (HY000): Lock wait timeo ...

  3. RocketMQ5.0.0消息存储<四>_刷盘机制

    目录 一.刷盘概览 二.Broker刷盘机制 1. 同步刷盘 2. 异步刷盘 1):未开启堆外内存池 2):开启堆外内存池 三.参考资料 一.刷盘概览 RocketMQ存储与读写是基于JDK NIO的 ...

  4. 鸿蒙系统的通知栏怎么没有静音,微信7.0.0没有消息提示音怎么解决?微信通知栏不显示消息的解决...

    近日,iOS和Android微信7.0.0正式版先后发布,已经有不少小伙伴升级到了最新版本.不过,这几天经常有粉丝朋友反馈,升级到微信7.0.0之后,通知栏没有消息提示音怎么回事?对于这种问题,小编以 ...

  5. GetMessage和PeekMessage的区别及PostMessage(hWnd, WM_QUIT, 0, 0)消息

    关于这个两个函数的详细信息: BOOL GetMessage(   LPMSG lpMsg,              // 一个MSG的指针   HWND hWnd,               / ...

  6. PC微信HOOK-PC微信逆向-消息防撤回(版本3.0.0.47)

    微信HOOK已全部升级为3.0.0.47,今天来分享一下微信防撤回的代码. 微信防撤回的思路就是:当好友撤回消息时,先保存要执行的CALL,用于恢复撤回,然后让微信不执行撤回CALL. 具体源码如下( ...

  7. 基于MySQL 8.0 对事务的深度理解

    基于MySQL 8.0 对事务的深度理解 一.MySQL中事务隔离级别 事务的隔离级别有哪些? 隔离级别 脏读 不可重复读 幻读(虚读) 未提交读(Read uncommitted) 可能 可能 可能 ...

  8. 重磅消息:Redis 6.0.0 稳定版发布

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | https://www.sohu.com/a/ ...

  9. SpringMvc之整合DWR3.0.0推送消息

    下载地址 http://www.directwebremoting.org/dwr/ https://github.com/directwebremoting/dwr/releases 本文以DWR3 ...

最新文章

  1. R语言使用ggplot2包使用geom_boxplot函数绘制基础分组水平箱图(boxplot)实战
  2. 克隆虚拟机加入域遇到的相同SID问题解决。。。
  3. 004 .NetReactor 3.6.0.0之另类脱壳法
  4. FOR ALL ENTRIES IN内表排序、排重对性能的影响
  5. conda init 关闭和重启shell_TP5实战源码 通过shell建立PHP守护程序
  6. 【爬蟲】使用Jsoup解析文档
  7. ITK:对给定LabelMap的所有LabelObject应用形态学关闭操作
  8. Cool Kitten:新鲜出炉的视差滚动 响应式框架
  9. 【C++ Primer | 16】容器适配器全特化、偏特化
  10. 在Linux下开发多语言软件: Hello GetText!
  11. 服务器的虚拟化配置,怎么配置服务器的虚拟化环境
  12. eclipse 如何关联git_git的相关操作
  13. 挑战程序设计竞赛 — 知识总结
  14. 《Redis开发与运维》学习第八章
  15. 让机器看了几千万篇热门文章总结的17类热门标题方式模板
  16. EMC被唯冠科技诉侵权:商标纠纷延宕13年
  17. 【JAVA之NIO框架介绍】
  18. 树莓派 外接HDMI线显示屏分辨率设置
  19. 红米note3 S线刷MUI版本,解决手机卡顿
  20. 三人表决器实验报告总结_三人表决器实验报告..doc

热门文章

  1. 学生计算机屏幕坏了怎么办,电脑自己检查自己修,如果显示器坏掉我们该怎么办?...
  2. 如何从根本上防止服务器被攻击
  3. [leetcode]1438. 绝对差不超过限制的最长连续子数组
  4. 黑苹果 MacOS 10.15 Catalina 安装详细教程带工具资料
  5. PHP字符串变量的长度限制问题
  6. Emoji 的处理 - 使用正则表达式匹配所有 Emoji表情
  7. win10 桌面(Windows 资源管理器)卡死的根本解决办法
  8. 洛谷 P2597 灾难(支配树)
  9. Oracle 企业管理器DataBase Control使用说明
  10. 中国大学生计算机设计大赛英语,中国大学生计算机设计大赛