分布式事务介绍

所谓事务,就是一系列业务操作构成的独立的执行单元。比如用户购买商品下单的行为,需要执行创建订单,扣减商品库存的两个不同的数据库操作,这就是一个事务。事务最重要的特性就是要支持原子性,要么所有操作全部成功,要么全部失败。为什么要这样设计呢?如果一切顺利,当然什么问题都不会有。但天有不测风云,没有谁能保证系统一直不会出错,如果哪一天订单已经创建成功了,但在扣减对应商品库存时突然失败了,那就麻烦大了,订单数据和商品的库存数据可能就对不上了,有可能商品早都没货了,客户还能继续下单购买。

所以,事务在保持数据一致性的方面是非常重要的。在单服务系统中我们一般不需要为事务操心,数据库已经为我们考虑了一切,只要在操作开始前声明了事务,那么调用过程只要发生了错误事务会自动回滚到操作开始时的状态。

但在微服务系统中,不同的业务操作可能被分割到不同的模块,而不同业务模块都会配置一个独立的数据源,甚至订单服务和仓储服务可能都不在同一个数据库中,这样显然就不能只依靠本地数据库事务来解决问题。我们需要一种能够跨网络和应用的分布式事务机制,分布式事务的主要实现思路一般分为两种:

  • 刚性事务:类似于上面说的数据库的本地事务,遵循ACID原则,要求数据的强一致性;代表性的实现就是二阶段提交(XA),通过引入一个事务协调者,所有事务的参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。XA一度是分布式事务的事实标准,但实际实现上却存在很多问题,比如事务在等待提交过程中处于同步阻塞状态,导致资源长时间占用,整体性能很差。

  • 柔性事务:遵循BASE理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。比如刚才我们说到的客户下单的例子,如果扣减库存的时候出错了,我们先不回滚系统,而是将扣减库存的调用请求存起来(比如存放到一个消息队列中),定时去重试。这样虽然订单数据和库存数据在某个时间段内是不一致的,但一旦库存服务恢复了,库存扣减请求就能够正常调用了,这样数据在某个时间点之后就会同步成一致的状态,这就是最终一致性。最终一致性提高了系统的可用性(客户下单可以不受仓储服务故障的影响),但也可能在数据不一致期间发生问题(比如超卖),这个需要考虑业务上是否能够接受。柔性事务的实现主要有重试和补偿两种机制,重试就是刚才描述方式,系统将出错的请求存储下来然后不断进行重试,直到成功;而补偿就是在出错之后,执行类似的一个undo操作,消除已提交的操作对数据的影响,比如扣减库存失败以后就直接把之前创建成功的订单再删除掉,TCC(Try/Confirm/Cancel)型事务就是采用这样的方式。

Seata 的相关概念

由于类似XA的刚性事务实现在复杂的分布式环境中存在大量的问题,所以目前主流的分布式事务实现方案都是走柔性事务路线的。比较流行的解决方案有阿里开源的seata,还有独立开发者发布的LCN框架,但LCN目前的维护更新遇到了困难(独立开发者的悲哀啊)。seata提供了几种不同的事务模式实现,包括:AT、TCC、SAGA 和 XA。在这里我们重点介绍一下AT模式,其它模式的说明请查看 seata的官网。AT模式也是走的补偿路线,但是不需要应用程序去关心调用失败后如何恢复数据,而是由框架本身负责去恢复收当前事务影响的所有数据,这非常类似于我们已经习惯了的本地事务,对开发者的负担也比较小。

seata可分为三个角色:TC,TM和RM:

  • TC - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚。业务无关,需要在服务端独立进行部署。

  • TM - 事务管理器:发起全局事务的开始,提交或回滚(发起后由TC协调其它的RM执行相应的操作),该角色会集成到应用程序当中。

  • RM - 资源管理器:全局事务的参与者,管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,该角色也是集成到应用程序当中的。

部署TC server

首先在 官网 下载seata server的二进制文件,其实就是一个spring boot的应用程序。在服务器上解压之后,进入到conf文件夹,我们需要重点关注registry.conf这个文件,它的作用是配置seata server(TC)注册到哪个注册中心,目前支持几乎所有主流的注册中心,TC会从配置中心中读取相应的配置,TM和RM实例也可以通过同一个注册中心找到TC部署的位置,从而实现TC的集群化部署。我们还是延用之前就部署好的consul来作为注册中心,只需要将registry.conf中的type修改为“consul”即可,并调整配置文件中consul的部署地址:

# 注册中心配置,用于TC,TM,RM的相互服务发现
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "consul"consul {cluster = "seata"serverAddr = "192.168.1.220:8500"}
}
# 配置中心配置,用于读取TC的相关配置
config {# file、nacos 、apollo、zk、consul、etcd3type = "file"file {name = "file.conf"}
}

file.conf中是TC的一些持久化配置选项,seata支持文件和数据库两种持久化方式,文件模式比数据库更快,但不支持并发操作,如果要部署TC集群就必须要使用数据库进行持久化。这里我们为了简化,就直接使用默认的file模式。file.conf也可以初始化到配置中心,通过配置中心来统一读取,这对于集群部署是有帮助的,初始化的方式可参考 这里 。修改完配置后,直接运行seata-server.sh启动TC server:

$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1

启动成功后我们可以在consul的控制台看到TC注册的服务:

集成TM和RM

seata的AT模式会在服务调用失败后,自动恢复受影响的数据,其原理就是在启动事务之后,会自动分析当前事务中执行的SQL对数据的影响,把受影响的数据直接存储到本地数据库中,如果事务回滚了,就通过存储的数据备份对原始数据进行恢复( AT模式介绍 ),所以我们需要在每个RM所在的业务数据库中初始化seata的undo_log表。我们之前的spring-cloud-demo中也还没有建立相应的业务数据库,为了测试分布式事务,我们需要为order-service和storage-service创建两个业务库,并初始化相关的业务表格,整个初始化脚本如下:


-- 创建order-service数据库
CREATE DATABASE `cloud-demo-order`;CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`t_order` (`id` int(11) NOT NULL AUTO_INCREMENT,`customer_code` varchar(45) DEFAULT NULL COMMENT '客户编码',`good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',`good_quantity` int(11) DEFAULT NULL COMMENT '购买数量',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`undo_log`
(`id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',`xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',`log_created`   DATETIME     NOT NULL COMMENT 'create datetime',`log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';-- 创建storage-service数据库
CREATE DATABASE `cloud-demo-storage`;CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`t_inventory` (`id` int(11) NOT NULL AUTO_INCREMENT,`good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',`good_quantity` int(11) DEFAULT NULL COMMENT '库存总量',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`undo_log`
(`id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',`xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',`log_created`   DATETIME     NOT NULL COMMENT 'create datetime',`log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

初始化脚本后,在order-serivce模块和storage-service模快引入seata的依赖包和相关配置:

  <!--在spring cloud中自动配置seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2.2.1.RELEASE</version></dependency>

需要注意的是maven库中还有一个包名称是spring-cloud-alibaba-seata,我看了一下两者的源码是一致的,任意引用哪一个包都可以,不知道为什么会有两个不同名称。在模块的application.yml中加入如下内容(以order-service模块的配置为例):

# Seata 配置项,对应 SeataProperties 类
seata:application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}tx-service-group: ${spring.application.name}-group # 该应用所属事务组编号,用于寻找TC集群的映射# Seata 服务配置项,对应 ServiceProperties 类service:#事务分组与TC集群(seata)集群的映射关系,order-service-group对应tx-service-group参数,值为一个虚拟的TC集群名称#默认值就是default,如果无需分组可以不用设置vgroup-mapping:order-service-group: default# Seata注册中心配置项registry:type: consul # 注册中心类型,默认为 fileconsul:cluster: seata #对应seata集群在consul中注册的服务名server-addr: 192.168.1.220:8500

需要注意的是 registry.consul.cluster 中指定的名称需要与TC server的同名配置项中指定的名称一致(默认的值是default),否则会找不到TC server,这个在官方文档和很多教程中都没有说明。 更多配置内容可以查看 配置说明。

另外为了方便对数据库进行操作,order-serivce模块和storage-service模快都引入了mybatis-plus框架的相关依赖和配置,这里就不做详细介绍了,具体可以去查看源码。一切准备好以后,将原有的orderService.createNewOrder方法做如下改造:

    @GlobalTransactionalpublic Integer createNewOrder(OrderDTO orderDTO) {Order newOrder = new Order();newOrder.setCustomerCode(orderDTO.getCustomerCode());newOrder.setGoodCode(orderDTO.getGoodCode());newOrder.setGoodQuantity(orderDTO.getQuantity());//向本地数据库插入订单信息this.save(newOrder);InventoryChangeDTO req = new InventoryChangeDTO();req.setGoodCode(orderDTO.getGoodCode());req.setQuantity(orderDTO.getQuantity());//调用远程仓储服务变更库存Integer remainQuantity = storageService.updateInventoryOfGood(req);return remainQuantity;}

其实除了写入数据库的相关代码,这里最重要的变化就是加入了**@GlobalTransactional注解,这个注解标记了当前方法会开启一个全局事务。在本例中order-service模块既是 TM (我的理解是声明@GlobalTransactional**的地方就算是一个TM)又是 RM,而storage-service模块则是另一个 RM

通过PostMan请求创建订单的接口地址:http://localhost:9001/api/order/create-order,一切顺利的话就能在数据库中看到新增的订单数据和库存数据。

从相关的日志中可以看出全局事务的开始和提交:

2020-05-06 15:28:21.311  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342569]
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342569] commit status: Committed
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] c.g.d.s.o.controller.Controller          : 剩余数量:-20
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] i.s.core.rpc.netty.RmMessageListener     : onMessage:xid=192.168.1.220:8091:2010342569,branchId=2010342570,branchType=AT,resourceId=jdbc:mysql://192.168.1.212:3306/cloud-demo-order,applicationData=null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch committing: 192.168.1.220:8091:2010342569 2010342570 jdbc:mysql://192.168.1.212:3306/cloud-demo-order null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

我们将 storageService.updateInventoryOfGood 方法稍稍修改一下,故意引发一个异常来测试全局事务的回滚:

public Integer changeInventory(InventoryChangeDTO req) {Inventory inventory = this.getOne(Wrappers.<Inventory>lambdaQuery().eq(Inventory::getGoodCode, req.getGoodCode()));if (inventory == null) {inventory = new Inventory();inventory.setGoodQuantity(0);inventory.setGoodCode(req.getGoodCode());}inventory.setGoodQuantity(inventory.getGoodQuantity() - req.getQuantity());this.saveOrUpdate(inventory);//引发异常回滚Object exceptionCause = null;exceptionCause.toString();return inventory.getGoodQuantity();}

还需要注意一点,测试全局事务回滚时需要将我们直接配置的hystrix fallback关闭,否则应用程序调用远程接口失败后会触发fallback机制,从而让seata认为远程调用是成功的,就不会触发回滚:

//@FeignClient(name = "storage-service", fallback = StorageServiceFallback.class)
//测试全局事务回滚时需要注释掉fallback,否则接口会返回默认的值导致事务无法回滚
@FeignClient(name = "storage-service")
public interface StorageService {@PostMapping("/api/storage/change-inventory")Integer updateInventoryOfGood(InventoryChangeDTO inventoryChangeDTO);}

然后我们再次请求创建订单的服务,从日志上可以看出,事务已成功触发回滚操作:

2020-05-06 15:50:57.809  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342574]
2020-05-06 15:50:59.157  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-storage-service
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.netflix.loadbalancer.BaseLoadBalancer  : Client: storage-service instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-05-06 15:50:59.290  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2020-05-06 15:50:59.325  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.329  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client storage-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[192.168.1.252:9002],Load balancer stats=Zone stats: {unknown=[Zone:unknown;   Instance count:1;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.252:9002;    Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]}ServerList:ConsulServerList{serviceId='storage-service', tag=null}
2020-05-06 15:51:00.119  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342574] rollback status: Rollbacked

本文的相关代码可以查看这里 spring-cloud-demo

Spring Cloud笔记(8)使用Seata管理分布式事务相关推荐

  1. Spring Boot之基于Dubbo和Seata的分布式事务解决方案

    转载自 Spring Boot之基于Dubbo和Seata的分布式事务解决方案 1. 分布式事务初探 一般来说,目前市面上的数据库都支持本地事务,也就是在你的应用程序中,在一个数据库连接下的操作,可以 ...

  2. Spring Cloud Alibaba系列四:集成 seata 实现分布式事务

    文章目录 Spring Cloud Alibaba系列四:集成 seata 实现分布式事务 前言 Seata 是什么? Seata 术语 安装 seata 1.创建 seata 数据库,并添加对应的表 ...

  3. Spring Cloud 整合 seata 实现分布式事务极简入门

    Spring Cloud 整合 seata 实现分布式事务极简入门 seata Spring Cloud 整合 seata 实现分布式事务极简入门 1. 概述 2. 部署nacos 3. 部署seat ...

  4. Spring Cloud构建微服务架构:分布式服务跟踪(整合zipkin)【Dalston版】

    通过上一篇<分布式服务跟踪(整合logstash)>,我们虽然已经能够利用ELK平台提供的收集.存储.搜索等强大功能,对跟踪信息的管理和使用已经变得非常便利.但是,在ELK平台中的数据分析 ...

  5. Spring Cloud构建微服务架构:分布式配置中心【Dalston版】

    Spring Cloud Config是Spring Cloud团队创建的一个全新项目,用来为分布式系统中的基础设施和微服务应用提供集中化的外部配置支持,它分为服务端与客户端两个部分.其中服务端也称为 ...

  6. Spring Cloud构建微服务架构:分布式服务跟踪(整合logstash)【Dalston版】

    通过之前的<入门示例>,我们已经为两个由SpringCloud构建的微服务项目 trace-1和 trace-2引入了Spring Cloud Sleuth的基础模块 spring-clo ...

  7. Spring Cloud构建微服务架构:分布式服务跟踪(跟踪原理)

    通过上一篇<分布式服务跟踪(入门)>的例子,我们已经通过Spring Cloud Sleuth往微服务应用中添加了实现分布式跟踪具备的基本要素.下面通过本文来详细说说实现分布式服务跟踪的一 ...

  8. 基于Nacos配置中心实现Spring Cloud Gateway的动态路由管理

    前面我们了解过了Sentinel 网关流量控制之Spring Cloud Gateway实战,今天带给大家是基于Nacos配置中心实现Spring Cloud Gateway的动态路由管理. 1.为什 ...

  9. springCloud整合seata实现分布式事务

    seata简介 Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一站式的分布式 ...

最新文章

  1. freemarker if判断
  2. python算法集合_python – 一个集合联合查找算法
  3. android中websockt断开链接,接吻SDK - 的WebSocket在断开的Android
  4. ZooKeeper伪分布式集群安装及使用
  5. Android中的主题Theme
  6. 004_ZooKeeper客户端基础命令
  7. 函数学习-bool()
  8. C++学习之路—继承与派生(四)拓展与总结
  9. 感受来自AI的幸福:可可豆、巧克力与人工智能
  10. Juniper防火墙新手教程8:Juniper防火墙配置的导入及导出
  11. VirtualBox中,WIN虚拟机与WIN共享文件夹
  12. python运维是什么_python运维方面一般用来做什么
  13. 易语言卷帘菜单与json_易语言卷帘式菜单加背景图片源码
  14. 科研人员论文投稿邮箱选择的问题
  15. 车辆vin信息(含发动机号)
  16. c语言项目答辩演讲稿,关于竞选社团负责人的演讲稿
  17. autoCAD恐吓式销售_恐吓式软文的例子?恐吓式软文营销经典案例分享
  18. run npm fund for details
  19. console的基础使用
  20. 书籍_《未来世界的幸存者》阮一峰--2/5

热门文章

  1. 网址转二维码,如何批量将网址转为二维码
  2. Cordova项目中使用BUI框架,以及常见问题解答
  3. 关闭惠普笔记本Flow后台程序
  4. ASO优化之应用关键词搜索量
  5. 架构系列——架构师必备基础:单体、分布式、集群与冗余的区别
  6. 禁用/重新启用笔记本电脑自带键盘
  7. 培智认识计算机教案,培智学校普通教室设备配备要求.doc
  8. vant表单点击提交没有触发验证_vant(ZanUi)结合async-validator实现表单验证的方法
  9. Python元组赋值顺序问题
  10. MATLAB人脸检测算法