为了更好理解分布式事务,首先提出一个问题:

假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚。针对这种情况处理方式很简单,只需要使用个事务就好了。

但假如ta和tb不在一个数据库中或者不在一个数据库实例上,此时数据库的事务这两个表也是无法同时管理的,针对这种情况要如何解决了?如何保证对ta和tb操作的一致性?

此时可以通过TCC来解决上述问题,TCC通过实现两阶段协议,将服务流程抽象为Try-Confirm-Cancel 三个操作:

第一阶段:try,主要用于对资源的预留

第二阶段:comfirm/cancel,comfirm用于对预留资源的使用,对业务进行提交,cancel是对预留资源的释放,对业务进行回滚操作

下面从三个方面介绍TCC

seata中TCC的源码实现

写好TCC实现的注意点

seata中TCC模式如何做到高可用的

1. seata中TCC的实现

seata主要由三个模块组成

TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

public interface TccActionOne {

@TwoPhaseBusinessAction(name = "DubboTccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")

public boolean prepare(BusinessActionContext actionContext, int a);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);

}

public interface TccActionTwo {

@TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")

public boolean prepare(BusinessActionContext actionContext, String b);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);

}

同时在provider和consumer端都要引入具体的GlobalTransactionScanner,该类会对TM和RM进行初始化和注册:

具体调用如图:

image.jpeg

以上是简单的使用方式,有了上面基本使用流程的介绍后,现在开始分析下具体的代码实现,首先重点关注下以下的类和注解:

GlobalTransactionScanner,用于扫描是否开启了分布式事务,并对加了分布式事务注解的方法注入代理,如TwoPhaseBusinessAction和GlobalTransactional

注解TwoPhaseBusinessAction,表示该方法使用的TCC模式,并同时制定commit和cancel方法

注解GlobalTransactional,用于表示被修饰的方法会开启分布式事务来进行处理

GlobalTransactionScanner通过AbstractAutoProxyCreator类,来为被分布式相关注解修饰的方法添加动态代理,所以在服务启动时,会执行GlobalTransactionScanner类中相关方法,

主要涉及的方法有:

GlobalTransactionScanner#initClient,初始化TM和RM客户端

GlobalTransactionScanner#wrapIfNecessary,为添加了TwoPhaseBusinessAction和GlobalTransactional注解的方法添加代理,同时分别为修饰的方法注入TccActionInterceptor和GlobalTransactionalInterceptor代理类,同时会将本地服务作为RM客户端注册到TC服务端中

1.1 客户端初始化

所以GlobalTransactionScanner是Seata客户端的启动类,首先看下TM和RM客户端的初始化

TM和RM会分别初始化TmNettyRemotingClient和RmNettyRemotingClient,这个两个类的父类都是AbstractNettyRemotingClient,在该类的init方法中,会启动一个定时来检查TC的channel是否存活,同时会发送注册信息到TC中,最后会启动netty客户端

public void init() {

timerExecutor.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

//检测连接TC的channel是否存活,若不存在对应channel或者channel已关闭,则会重新连接到TC,同时发送注册信息到TC服务中

clientChannelManager.reconnect(getTransactionServiceGroup());

}

}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

if (NettyClientConfig.isEnableClientBatchSendRequest()) {

mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,

MAX_MERGE_SEND_THREAD,

KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<>(),

new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));

mergeSendExecutorService.submit(new MergedSendRunnable());

}

super.init();

clientBootstrap.start();

}

在NettyClientChannelManager#reconnect方法中,会通过获取所有注册到注册中心的TC服务地址,然后判断当前缓存NettyClientChannelManager#channels中是否存在对应地址且存活状态的channel,若不存在,则会为该TC地址创建channel,同时向改地址发送注册信息,TmNettyRemotingClient和RmNettyRemotingClient注册信息分别为RegisterTMRequest和RegisterRMRequest,主要方法步骤是在netty.NettyClientChannelManager#doConnect中创建channel,然后在NettyPoolableFactory#makeObject方法中发送对应的注册消息

而在GlobalTransactionScanner#wrapIfNecessary方法中,会为TwoPhaseBusinessAction和GlobalTransactional修饰的方法添加代理实现

@Override

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {

try {

synchronized (PROXYED_SET) {

if (PROXYED_SET.contains(beanName)) {

return bean;

}

interceptor = null;

//check TCC proxy

if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {

//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC

interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));

ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

(ConfigurationChangeListener)interceptor);

} else {

Class> serviceInterface = SpringProxyUtils.findTargetClass(bean);

Class>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

if (!existsAnnotation(new Class[]{serviceInterface})

&& !existsAnnotation(interfacesIfJdk)) {

return bean;

}

if (interceptor == null) {

if (globalTransactionalInterceptor == null) {

globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

ConfigurationCache.addConfigListener(

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

(ConfigurationChangeListener)globalTransactionalInterceptor);

}

interceptor = globalTransactionalInterceptor;

}

}

LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());

if (!AopUtils.isAopProxy(bean)) {

bean = super.wrapIfNecessary(bean, beanName, cacheKey);

} else {

AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);

Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));

for (Advisor avr : advisor) {

advised.addAdvisor(0, avr);

}

}

PROXYED_SET.add(beanName);

return bean;

}

} catch (Exception exx) {

throw new RuntimeException(exx);

}

}

方法中会为TwoPhaseBusinessAction注解修饰的方法生成TccActionInterceptor代理,为GlobalTransactional生成GlobalTransactionalInterceptor代理。

但在TCCBeanParserUtils#isTccAutoProxy方法中若存在TwoPhaseBusinessAction注解,会通过RmNettyRemotingClient#registerResource发送注册信息,具体方法在DefaultRemotingParser#parserRemotingServiceInfo中,个人觉得这个步骤可以去掉了有点冗余

1.2 服务端初始化

TC服务端启动类io.seata.server.Server#main,该方法中会初始化DefaultCoordinator类,这个类是所有消息的处理类,DefaultCoordinator主要属性如下

//各种定时任务,用来重试

private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryRollbacking", 1));

private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryCommitting", 1));

private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("AsyncCommitting", 1));

private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("TxTimeoutCheck", 1));

private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("UndoLogDelete", 1));

private RemotingServer remotingServer; //消息通信服务端

private DefaultCore core; //主要的事务处理

针对消息的处理流程,具体方法在NettyRemotingServer#registerProcessor:

private void registerProcessor() {

// 1. registry on request message processor

ServerOnRequestProcessor onRequestProcessor =

new ServerOnRequestProcessor(this, getHandler());

//处理事务提交回滚等消息

super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

// 2. registry on response message processor,对分支事务提交和回滚响应结果的处理

ServerOnResponseProcessor onResponseProcessor =

new ServerOnResponseProcessor(getHandler(), getFutures());

super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);

super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

// 3. registry rm message processor,处理RM客户端的注册消息

RegRmProcessor regRmProcessor = new RegRmProcessor(this);

super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

// 4. registry tm message processor,处理TM客户端的注册消息

RegTmProcessor regTmProcessor = new RegTmProcessor(this);

super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

// 5. registry heartbeat message processor,处理客户端的心跳消息

ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);

super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);

}

从上图可以看出注册消息是在RegRmProcessor和RegTmProcessor中进行处理。

1.3 TCC消息处理分析

首先还是看下一开始的示例代码:

@GlobalTransactional

public String doTransactionCommit(){

//第一个TCC 事务参与者

boolean result = tccActionOne.prepare(null, 1);

if(!result){

throw new RuntimeException("TccActionOne failed.");

}

List list = new ArrayList();

list.add("c1");

list.add("c2");

result = tccActionTwo.prepare(null, "two", list);

if(!result){

throw new RuntimeException("TccActionTwo failed.");

}

return RootContext.getXID();

}

在调用doTransactionCommit方法时,会进入到代理类GlobalTransactionalInterceptor中,最终会执行到TransactionalTemplate#execute方法,该方法的主要逻辑如下:

beginTransaction(txInfo, tx);//开始事务

rs = business.execute(); //执行业务代码,即执行doTransactionCommit方法

commitTransaction(tx); //提交事务

在开始事务beginTransaction方法中,会向TC服务发送GlobalBeginRequest消息,来获取事务xid,该消息最终会在服务端DefaultCore#begin方法中得到处理:

通过雪花算法产生一个随机数作为transactionId.根据transactionId生成xid,具体规则是ipAddress + ":" + port + ,":" + transactionId,ipAddress为本机ip,port为当前服务的端口

将全局事务记录写入global_table表中,同时返回xid,表中xid为主键,transactionId为索引

执行业务代码,业务代码会调用远端服务,如tccActionOne.prepare方法,由于该方法被TwoPhaseBusinessAction修饰,会执行代理类TccActionInterceptor,在TccActionInterceptor类中的invoke方法主要逻辑如下:

String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); //注册分支事务,同时获取分支事务id

.....

ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute()); #执行实际的分支事务的业务代码,如tccActionOne.prepare方法

其中在doTccActionLogStore方法中客户端通过发送BranchRegisterRequest消息,其中clientId和lockKeys都是null,resourceId为TwoPhaseBusinessAction中name名称该消息最终会在服务端AbstractCore#branchRegister:

通过随机算法生成branchId

生成一个分支记录,将记录插入到branch_table表中,其中branchId为主键

doTransactionCommit所有业务逻辑执行成功后,GlobalTransactionalInterceptor代理类会执行到commitTransaction方法,进行全局事务的提交,客户端会通过DefaultTransactionManager#commit方法发送GlobalCommitRequest事务提交消息,此时服务端接收到该消息后,会通过DefaultCore#doGlobalCommit方法进行全局事务的提交,该方法的主要逻辑如下:

从存储中获取全局事务xid下所有分支事务记录,为每个分支事务调用AbstractCore#branchCommitSend方法,发送BranchCommitRequest消息到对应的分支事务客户端,来进行分支事务的commit,客户端接收到消息后,会执行TwoPhaseBusinessAction注解中填写的commit方法来完成分支事务的提交

当有任何一个分支服务调用失败时,如tccActionOne.prepare调用失败,会回滚全局事务,然后TC服务端会回滚所有的分支事务

2. 写好TCC实现的注意点

写好一个完备的TCC的实现是有一定的要求,需要解决空回滚,幂等操作和悬挂问题。

空回滚

即全局事务回滚时,有可能分支事务try接口由于网络问题并没有被触发或者还在处于try阶段,此时TC已经触发了分支事务的cancel,此时需要分支事务服务需要返回成功,不然会有重试,即分支事务要支持空回滚

幂登性

由于网络抖动问题,分支事务中的try方法可能会被执行多次,所以要保证资源不会被重复消耗,解决办法可以通过为每一个请求维护一个唯一id,如分支事务id,来过滤重复的请求

悬挂问题

当全局事务回滚时,由于分支事务try方法执行了较长时间,导致分支事务执行cancel方法后,try方法才执行成功,这样导致被try锁定的资源得不到释放,解决办法是将每个分支事务的请求记录下来,所以当执行try方法后,发现已经存在cancel的执行记录后,则回滚当前的try操作

3. seata中TCC模式如何做到高可用的

要做到高可用,要做到服务的无状态,为了做到这点seata做了如下工作:

存储,TC中事务数据的存储避免使用本地存储,可以使用mysql等

服务发现与注册,

从上文实现分析中,我们可以看出TC服务会将本服务的ip注册到注册中心,如zk,etcd等,TM和RM客户端会拉取所有TC服务端的地址,同时将客户端服务的ip注册到所有TC服务中,这样保证了每个TC服务都有所有客户端的链接信息

seata 如何开启tcc事物_分布式事务Seata-TCC源码分析相关推荐

  1. @Transactional的用法详解及Transactional事务无效的源码分析

    数据库事务正确执行的四要素 1.原子性 事务是不可分割的最小的工作单元,事务内的操作要么全做,要么全不做,不能只做一部分. 2.一致性 事务执行前数据库的数据按照逻辑处于正确的状态,事务执行后数据库的 ...

  2. 3 v4 中心节点固定_死磕以太坊源码分析之p2p节点发现

    死磕以太坊源码分析之p2p节点发现 在阅读节点发现源码之前必须要理解kadmilia算法,可以参考:KAD算法详解. 节点发现概述 节点发现,使本地节点得知其他节点的信息,进而加入到p2p网络中. 以 ...

  3. mybatis第十话 - mybaits整个事务流程的源码分析

    1.故事前因 在分析mybatis源码时一直带的疑问,一直以为事务是在SqlSessionTemplate#SqlSessionInterceptor#invoke完成的,直到断点才发现并不简单! 在 ...

  4. java tcc事务 例子_分布式事务之TCC事务模型

    正文 我们先套一个业务场景进去,如下图所示 那页面点了支付按钮,调用支付服务,那我们后台要实现下面三个步骤 [1] 订单服务-修改订单状态 [2] 账户服务-扣减金钱 [3] 库存服务-扣减库存 达到 ...

  5. 蚂蚁金服分布式事务框架DTX源码学习

    文章目录 一.前言 二.DTX简介 三.角色 四.服务发起者与参与者DTX客户端启动流程 1.项目启动,创建dtx动态代理 2.初始化DtxClient客户端的init()方法 五.服务发起以及参与流 ...

  6. redis watchdog_Redis分布式事务框架Redisson源码解析(一)

    代码片段一. public static void main(String[] args) throws Exception { Config config = new Config(); confi ...

  7. spring事务管理 TransactionProxyFactoryBean源码分析

    J2EE,当然离不开事务,事务又当然少不了Spring声明式事务.spring声明式事务,很多码农门,应该和笔者一样,停留在使用上,及仅仅了解点原理.如:Spring事务管理原理"代理+AO ...

  8. Redis核心原理与实践--事务实践与源码分析

    在Winform开发领域开发过十多年的项目中,见证着形形色色的架构和官方技术的应用,从最早类似Winform模式的WebForm技术,到接着的JQuery+界面组件,再到Asp.net Core的技术 ...

  9. java直接内存为什么快_直接内存与 JVM 源码分析

    直接内存(堆外内存) 直接内存有一种叫法,堆外内存. 直接内存(堆外内存)指的是 Java 应用程序通过直接方式从操作系统中申请的内存.这个差别与之前的堆.栈.方法区,那些内存都是经过了虚拟化.所以严 ...

最新文章

  1. python html解析查找字符串_用python的BeautifulSoup分析html
  2. opengl 创建context_OpenGL学习笔记1-创建窗口,绘制三角形
  3. C++知识点13——友元,类的声明
  4. Docker官方Centos镜像下安装Elasticsearch【详细步骤】
  5. Python:名片管理系统
  6. Shell 控制并发
  7. 办公自动化-使用python-docx生成文档-0223
  8. L1-041 寻找250-PAT团体程序设计天梯赛GPLT
  9. 辨异 —— Java 中的抽象类和接口
  10. iOS多线程技术—多线程简单介绍
  11. php 编译 sass,如何在Symfony 3中使用纯PHP编译SASS(scss)
  12. Android 8(1),腾讯字节爱奇艺网易华为实习面试汇总
  13. OSChina 周五乱弹 —— 美团外卖程序崩溃的真相
  14. 【AWVS】python调AWVS接口 新建扫描并导出扫描报告 [自定义扫描报告](三)
  15. Sorry, you have been blocked !vultr 又被 openai 屏蔽了,只能换个 vps 了
  16. Mac如何给压缩文件加密
  17. 区块链隐私保护文献 An Efficient NIZK Scheme for Privacy-Preserving Transactions over Account-Model Blockchain
  18. 长安大学计算机控制技术期末试题,长安大学汽车理论期末试卷及答案
  19. 好气色“吃”出来 7条守则缔造美肌—多喝水、喝对水
  20. 函数的四种特性——1、有界性2、单调性3、奇偶性4、周期性

热门文章

  1. 吴恩达作业9:卷积神经网络实现手势数字的识别(基于tensorflow)
  2. CrawlSpider 详解
  3. Java8 Stream详解~遍历/匹配(foreach/find/match)
  4. C++学习之路 | PTA乙级—— 1082 射击比赛 (20 分)(精简)
  5. C++学习之路 | PTA(天梯赛)—— L2-007 家庭房产 (25分)(带注释)(并查集)(精简)
  6. mysql 5.7安装完密码是多少_关于mysql5.7.18的安装并修改初始密码的图文教程
  7. 外设驱动库开发笔记27:ESP8266无线通讯驱动
  8. iOS中bundle的使用
  9. boolean 默认_MySQL数据类型测试:BOOLEAN、TINYINT测试数据总结(第八节)
  10. 单分支 两路分支和多分支的if结构_JavaScript学习笔记(二)-- 分支结构