分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好。今天我们来唠唠如何实现RocketMQ的事务消息!

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

RocketMQ事务流程概要

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程
整体流程为:

  • 正常事务发送与提交阶段
  1. 生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
  2. 服务端响应消息写入结果,半消息发送成功
  3. 开始执行本地事务
  4. 根据本地事务的执行状态执行Commit或者Rollback操作
  • 事务信息的补偿流程
  1. 如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
  2. 生产者收到确认回查请求后,检查本地事务的执行状态
  3. 根据检查后的结果执行Commit或者Rollback操作
    补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

  1. 事务消息在一阶段对用户不可见
    事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。
  2. 如何处理第二阶段的失败消息?在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。
  3. 消息状态
    事务消息有三种状态:
  • TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
  • TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

代码实现

首先假设我们有这样一个需求:

用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务 account-service,用户微服务收到消息后会给用户账户增加余额。

这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。

基础配置

生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

  • 引入组件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

  • 添加配置
# within rocketmq
rocketmq:name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876producer:group: cloud-group

发送半消息

order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

@Override
public void delete(String orderNo) {Order order = orderMapper.selectByNo(orderNo);//如果订单存在且状态为有效,进行业务处理if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {String transactionId = UUID.randomUUID().toString();//如果可以删除订单则发送消息给rocketmq,让用户中心消费消息rocketMQTemplate.sendMessageInTransaction("add-amount",MessageBuilder.withPayload(UserAddMoneyDTO.builder().userCode(order.getAccountCode()).amount(order.getAmount()).build()).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("order_id",order.getId()).build(),order);}
}

首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,主要是关注 sendMessageInTransaction() 方法,源码如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {try {if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");} else {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);return this.producer.sendMessageInTransaction(rocketMsg, arg);}} catch (MQClientException var5) {throw RocketMQUtil.convert(var5);}
}

该方法有三个参数:

  • destination:目的地(主题),这里发送给 add-amount 这个主题
  • message:发送给消费者的消息体,需要使用 MessageBuilder.withPayload() 来构建消息
  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:

第一个方法 executeLocalTransaction 为执行本地事务;第二个方法 checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加 @Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

思路既然理顺了,咱们就开撸。

  • 首先创建一个日志表

很简单的三个字段,主要是这个事务id,需要根据这个事务id回查事务,还记得我们在发送半消息时生成的事务id吗,就是干这个用的!

  • 在生产者编写方法实现 RocketMQLocalTransactionListener
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {private final OrderService orderService;private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;/*** 执行本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {log.info("执行本地事务");MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer orderId = Integer.valueOf((String)headers.get("order_id"));log.info("transactionId is {}, orderId is {}",transactionId,orderId);try{//执行本地事务,并记录日志orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);//执行成功,可以提交事务return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查,检查本地事务是否成功*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("检查本地事务,事务ID:{}",transactionId);//根据事务id从日志表检索QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();queryWrapper.eq("transaction_id",transactionId);RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);if(null != rocketmqTransactionLog){return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}

  • 执行本地事务的方法
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){//将订单状态置位无效orderMapper.changeStatus(id,status);//插入事务表rocketMqTransactionLogMapper.insert(RocketmqTransactionLog.builder().transactionId(transactionId).log("执行删除订单操作").build());
}

这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了

消费消息

Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在 account-service监听消息,如果收到消息则给用户账户增加余额。

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {private final AccountMapper accountMapper;/*** 收到消息的业务逻辑*/@Overridepublic void onMessage(UserAddMoneyDTO userAddMoneyDTO) {log.info("received message: {}",userAddMoneyDTO);accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());log.info("add money success");}
}

测试

订单表有这样一条记录,用户为jianzh5,amount为200

用户表的记录,执行完成后jianzh5的账户应该变成250

  • 调用删除订单接口,删除订单
  • 发送半消息
  • 执行本地事务,并生成事务日志
  • 模拟异常情况
    在发送Commit消息的时候我们用命令杀掉进程 taskkill /pid 19748 -t -f,模拟异常!
  • 重新启动order-service,查看是否会执行回查动作

MQServer进行回查,检查事务日志,判断是否可以提交事务

  • 消费者消费事务消息,保证事务的一致性

小结

使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!

如果本文对你有帮助,别忘记给我个三连:点赞,转发,评论。咱们下期见!

收藏 等于白嫖,点赞 才是真情!

这里为大家准备了一份小小的礼物,关注公众号 JAVA日知录,输入如下代码,即可获得百度网盘地址,无套路领取!
001:《程序员必读书籍》
002:《从无到有搭建中小型互联网公司后台服务架构与运维架构》
003:《互联网企业高并发解决方案》
004:《互联网架构教学视频》
006:《SpringBoot实现点餐系统》
007:《SpringSecurity实战视频》
008:《Hadoop实战教学视频》
009:《腾讯2019Techo开发者大会PPT》

010: 微信交流群

http://weixin.qq.com/r/_ElxaTTECGl3rXAK9xzq (二维码自动识别)

两个service事务统一_RocketMQ进阶 - 事务消息相关推荐

  1. for循环延时_RocketMQ进阶-延时消息

    前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消:还有在线商城完成订单后48小时不评价 ,自动5星好评.像这类在某事件触发后一段时间内执行的需求任务 ...

  2. 两个service事务统一_拜托,别再让我优化大事务了,我的头都裂了

    前言 最近有个网友问了我一个问题:系统中大事务问题要如何处理? 正好前段时间我在公司处理过这个问题,我们当时由于项目初期时间比较紧张,为了快速完成业务功能,忽略了系统部分性能问题.项目顺利上线后,专门 ...

  3. 就同一个Service类中,一个事务方法调用另外一个有事务的方法

    目录 一.Spring 事务机制 二.Spring事务传播行为 三.场景总结 1.在同一个类中,一个方法调用另外一个有注解(比如@Async,@Transational)的方法,注解是不会生效的 2. ...

  4. Day124.分布式事务:Seata、2PC两段式、代码补偿TCC、本地消息表、MQ事物消息

    目录 一.相关概念回顾 二.分布式事务 三.分布式事务解决方案 1.基于XA协议的两段式提交(2PC)  - 强一致性 2.代码补偿事务(TCC) - 最终一致性 3.本地消息表(异步确保)- 最终一 ...

  5. MySQL --- 19♪ 进阶15 TCL事务控制语言--建立结束事务/设置断点--默认隔离级别--脏读/幻读/不可重复读

    #TCL事物控制语言 : /*   Transaction control language : 事物控制语言   事务:     一个或者一组sql语句组成一个执行单元,这个执行单元要么全部执行,要 ...

  6. java事务 Dao层_spring事务到底用于service层还是dao层

    Spring事务为业务逻辑进行事务管理,保证业务逻辑上数据的原子性. 事务得根据项目性质来细分:事务可以设置到三个层面(dao层.service层和web层). 第一:web层事务,这一般是针对那些安 ...

  7. 编程式事务与声明式事务

    编程式事务 1.加入jar包 com.springsource.net.sf.cglib-2.2.0.jar com.springsource.org.aopalliance-1.0.0.jar co ...

  8. Spring事务管理-》Spring事务管理(annotation)

    5.6 使用@Transactional 除了使用XML类型的事务管理,同时Spring也提供了Annotation类型的事务管理.如下所示: 一:Spring事务管理 =============== ...

  9. 分布式事务讲解 - TX-LCN分布式事务框架(含LCN、TCC、TXC三种模式)

    分布式事务讲解 - TX-LCN分布式事务框架(含LCN.TCC.TXC三种模式) 分布式事务系列博客: TX-LCN框架原理 LCN 原理及主要特点 代码实现 实现场景 创建数据库及表(三个数据库, ...

最新文章

  1. Python学习总结(一)
  2. php 如何判断是否搜索出结果,搜索PHP - 如何调出搜索框没有结果
  3. 016_CSS选择器列表
  4. 关于计算机和人物的英语短文,人脑和电脑英语作文
  5. android 重装sdk或者系统的时模拟器出现can open ****
  6. 当我们在谈论cpu指令乱序的时候,究竟在谈论什么?
  7. 开发函数计算的正确姿势 —— Fun validate 语法校验排错指南
  8. 10-300-020-简介-架构-简介
  9. 利用Snapshot快速跨Region迁移服务器
  10. java setviewport_Java ImageView.setViewport方法代码示例
  11. Spring StringUtils类中几个有用的字符串处理方法
  12. TimesTen Classic 18c 卸载 (uninstall)全过程
  13. AspNetPager常用属性及用法 / URLRewrite伪静态与AspNetPager分页控件的结合
  14. FastDNS中修改IP地址
  15. iphone导出视频 无法连接到设备_iPhone 使用技巧:及时关注手机储存容量
  16. js 字符串截取 slice 的小bug 以及处理方式
  17. 让Element-ui的Container布局容器高度百分百显示
  18. APP,实现多国语言动态切换
  19. python crop
  20. Learning to Rank系列之概述

热门文章

  1. bash shell 学习记录
  2. Retrofit+RxJava
  3. 一致吗 驱动_外国不过春节?AMD驱动再更新,解决BUG,游戏不闪退,重启不黑屏...
  4. PKUSC2019游记
  5. lodash _.size
  6. 为什么前端开发这么不稳定?
  7. 谈谈Silverlight 2中的视觉状态管理 Part1
  8. 使用Dockerfile创建一个tomcat镜像
  9. css文本框样式收集
  10. windows 7 PowerShell 笔记