引言:Mycat已经成为了一个强大的开源分布式数据库中间件产品。面对企业应用的海量数据事务处理,是目前最好的开源解决方案。但是如果想让多台机器中的数据保存一致,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。 
本文选自《分布式数据库架构及企业实践——基于Mycat中间件》。

  随着并发量、数据量越来越大及业务已经细化到不能再按照业务划分,我们不得不使用分布式数据库提高系统的性能。在分布式系统中,各个节点在物理上都是相对独立的,每个节点上的数据操作都可以满足 ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。

XA 规范

  X/Open 组织(即现在的 Open Group)定义了分布式事务处理模型。X/Open DTP 模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。事务管理器(TM)是交易中间件,资源管理器(RM)是数据库,通信资源管理器(CRM)是消息中间件。通常把一个数据库内部的事务处理看作本地事务,而分布式事务处理的对象是全局事务。全局事务是指在分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作就是一个全局事务。在一个事务中可能更新几个不同的数据库,此时一个数据库对自己内部所做操作的提交不仅需要本身的操作成功,还需要全局事务相关的其他数据库的操作成功。如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。XA就是X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束、提交、回滚等,XA 接口函数由数据库厂商提供,根据这一思想衍生出二阶段提交协议和三阶段提交协议。

二阶段提交

  所谓的两个阶段是指准备阶段和提交阶段。 
  准备阶段指事务协调者(事务管理器)向每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败消息(如权限验证失败),要么在本地执行事务,写本地的 redo 和undo日志但不提交,可以进一步将准备阶段分为以下三步。 
  (1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。 
  (2)参与者节点执行询问发起为止的所有事务操作,并将 undo 信息和 redo 信息写入日志。 
  (3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个“同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个“中止”消息。 
  提交阶段指如果协调者收到了参与者的失败消息或者超时,则直接向每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务在处理过程中使用的锁资源。 
  二阶段提交所存在的缺点如下。 
  (1)同步阻塞问题,在执行过程中所有参与节点都是事务阻塞型的,当参与者占用公共资源时,其他第三方节点访问公共资源时不得不处于阻塞状态。 
  (2)单点故障,由于协调者的重要性,一旦协调者发生故障,则参与者会一直阻塞下去。 
  (3)数据不一致,在二阶段提交的第 2 个阶段中,当协调者向参与者发送 commit 请求之后发生了局部网络异常或者在发送 commit 请求的过程中协调者发生了故障,则会导致只有一部分参与者接收到了 commit 请求,而在这部分参与者在接收到 commit 请求之后就会执行commit操作,其他部分未接收到 commit 请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据不一致的现象。 
  由于二阶段提交存在诸如同步阻塞、单点问题、数据不一致、宕机等缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交。

三阶段提交

  三阶段提交(Three-phase commit,3PC),也叫作三阶段提交协议(Three-phase commitprotocol),是二阶段提交(2PC)的改进版本。三阶段提交把二阶段提交的准备阶段再次一分为二,这样三阶段提交就有 CanCommit、PreCommit、DoCommit 三个阶段。 
  (1)CanCommit 阶段:三阶段提交的 CanCommit 阶段其实和二阶段提交的准备阶段很像,协调者向参与者发送 commit 请求,参与者如果可以提交就返回 Yes 响应,否则返回 No 响应。 
  (2)PreCommit 阶段:协调者根据参与者的反应情况来决定是否可以记录事务的 PreCommit操作。根据响应情况,有以下两种可能。

  • 假如协调者从所有参与者那里获得的反馈都是 Yes 响应,则执行事务。
  • 假如有任何一个参与者向协调者发送了 No 响应,或者等待超时之后协调者都没有接到参与者的响应,则执行事务的中断。

(3)DoCommit阶段:该阶段进行真正的事务提交,也可以分为执行提交、中断事务两种执行情况。

  执行提交的过程如下。

  • 协调者接收到参与者发送的ACK响应后,将从预提交状态进入提交状态,并向所有参与者发送doCommit请求。
  • 事务提交参与者接收到doCommit请求之后,执行正式的事务提交,并在完成事务提交之后释放所有的事务资源。
  • 事务提交完之后,向协调者发送ACK响应。
  • 协调者接收到所有参与者的ACK响应之后,完成事务。中断事务的过程如下。
  • 协调者向所有参与者发送abort请求。
  • 参与者接收到 abort 请求之后,利用其在第 2 个阶段记录的 undo 信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
  • 参与者完成事务回滚之后,向协调者发送 ACK 消息。
  • 协调者接收到参与者反馈的 ACK 消息之后,执行事务的中断。

Mycat 中分布式事务的实现

  Mycat在1.6版本以后已经完全支持 XA 分布式强事务类型了,先通过一个简单的示例来了解Mycat中XA的用法。 
  用户应用侧(AP)的使用流程如下: 
  (1)set autocommit=0 
  在应用层需要设置事务不能自动提交; 
  (2)set xa=on 
  在 SQL 中设置 XA 为开启状态; 
  (3)执行 SQL 
   insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’); 
  (4)commit 或者 rollback 
  对事务进行提交(提交成功或者回滚异常)。 
  完整的流程图如图所示。 
 
  Mycat 内部实现侧的实现流程如下: 
  (1)set autocommit=0 
  将 MysqlConnection 中的 autocommit 设置为 false; 
  (2)set xa=on 
  在Mycat中开启 XA 事务管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令进行 XA 事务开始标记,继续拼装 SQL 业务(Mycat 会将上面的 insert 数据分片到不同的节点上),拼装 XA END XID,XA PREPARE XID 最后进行 1pc 提交并记录日志到 tm.log 中,如果 1pc 阶段有异常,则直接回滚事务 XA ROLLBACK xid。 
  (3)在多节点 MySQL 中全部进行 2pc 提交(XA COMMIT),提交成功后,事务结束;如果有异常,则对事务进行重新提交或者回滚。 
  Mycat 中的 XA 分布式事务的异常处理流程如下: 
  (1)一阶段 commit 异常:如果 1pc 提交任意一个 mysql 节点无法提交或者异常,则全部节点的事务进行回滚,抛出异常给应用侧事务回滚。 
  (2)Mycat Crash Recovery 
  Mycat 崩溃以后,根据 tm.log 事务日志再进行重启恢复,mycat 启动后执行事务日志查找各个节点中已经 prepared 的 XA 事务,进行 commit 或者 rollback。

1. 相关类说明

  通过用户应用侧发送 set xa = on ; SQL 开启 Mycat 内部 XA 事务管理器的功能,事务管理器将对 MySQL 数据库进行 XA 方式的事务管理,具体事务管理功能的实现代码如下:

  • MySQLConnection:数据库连接。
  • NonBlockingSession:用户连接 Session。
  • MultiNodeCoordinator:协调者。
  • CommitNodeHandler:分片提交处理。
  • RollbackNodeHandler:分片回滚处理。

2. 代码解析

  XA 事务启动的源码如下:

public class MySQLConnection extends BackendAIOConnection {//设置开启事务private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {if (autoCommit) {sb.append("SET autocommit=1;");} else {sb.append("SET autocommit=0;");}}public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException {if(!modifiedSQLExecuted && rrn.isModifySQL()) {modifiedSQLExecuted = true;}//获取当前事务 IDString xaTXID = sc.getSession2().getXaTXID();synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);}
……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 MySQLConnection.java源码
}

  用户应用侧设置手动提交以后,Mycat 会在当前连接中加入

  SET autocommit=0;

  将该语句加入到 StringBuffer 中,等待提交到数据库。 
  用户连接 Session 的源码如下:

public class NonBlockingSession implements Session {……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 NonBlockingSession.java 源码
}
SET XA = ON ;语句分析

  用户应用侧发送该语句到 Mycat 中,由 SQL 语句解析器解析后交由 SetHandle 进行处理c.getSession2().setXATXEnabled (true); 
  调用 NonBlockSession 中的 setXATXEnable d 方法设置 XA 开关启动,并生成 XID,代码如下:

public void setXATXEnabled(boolean xaTXEnabled) {LOGGER.info("XA Transaction enabled ,con " + this.getSource());if (xaTXEnabled && this.xaTXID == null) {xaTXID = genXATXID();}
}

  另外,NonBlockSession 会接收来自于用户应用侧的 commit, 调用 commit 方法进行处理事务提交的逻辑。 
  在 commit()方法中,首先会 check 节点个数,一个节点和多个节点分为不同的处理过程,这里只讲下多个节点的处理方法 checkDistriTransaxAndExecute(); 
  该方法会对多个节点的事务进行提交。 
  协调者的源码如下:

public class MultiNodeCoordinator implements ResponseHandler {……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 MultiNodeCoordinator.java 源码
}

  在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 会话类会调用专门进行多节点协同的 MultiNodeCoordinator 类进行具体的处理,在 MultiNodeCoordinator类中,executeBatchNodeCmd 方法加入 XA 1PC 提交的处理,代码片段如下:

for (RouteResultsetNode rrn : session.getTargetKeys()) {……if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){//recovery LogparticipantLogEntry[started] = newParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};if (LOGGER.isDebugEnabled()) {LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort());}mysqlCon.execBatchCmd(cmds);}
……
}

  在 MultiNodeCoordinator 类的 okResponse 方法中,则进行 2pc 的事务提交

MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus()){case TxState.TX_STARTED_STATE:if (mysqlCon.batchCmdFinished()){String xaTxId = session.getXaTXID();String cmd = "XA COMMIT " + xaTxId;if (LOGGER.isDebugEnabled()) {LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort());}//recovery logCoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId);for(int i=0; i<coordinatorLogEntry.participants.length;i++){LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]);if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE;}}inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry);fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());//send commitmysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);mysqlCon.execCmd(cmd);}return;
……
}

  分片事务提交处理的源码如下:

public class CommitNodeHandler implements ResponseHandler {//结束 XApublic void commit(BackendConnection conn) {……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 CommitNodeHandler.java源码}//提交 XA@Overridepublic void okResponse(byte[] ok, BackendConnection conn) {……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 CommitNodeHandler.java 源码
}

  在 Mycat 中同样支持单节点 MySQL 数据库的 XA 事务处理,在 CommitNodeHandler 类中就是对单节点的 XA 二阶段处理,处理方式与 MultiNodeCoordinator 类同,通过 commit 方法进行 1pc 的提交,而通过 okResponse 的方法进行 2pc 阶段的事务提交。 
  分片事务回滚处理的源码如下:

public class RollbackNodeHandler extends MultiNodeHandler {……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 RollbackNodeHandler.java 源码
}

  在 RollbackNodeHandler 的 rollback 方法中加入了对 XA 事务的 rollback 处理,用户应用侧发起的 rollback 会在这个方法中进行处理。

for (final RouteResultsetNode node : session.getTargetKeys()) {……//support the XA rollbackMySQLConnection mysqlCon = (MySQLConnection) conn;if(session.getXaTXID()!=null) {String xaTxId = session.getXaTXID();mysqlCon.execCmd("XA END " + xaTxId + ";");mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");}else {conn.rollback();}
……
}

  同样,该方法会对所有的 MySQL 数据库节点发起 xa rollback 指令。 
  本文选自《分布式数据库架构及企业实践——基于Mycat中间件》,点此链接可在博文视点官网查看。 
                
  想及时获得更多精彩文章,可在微信中搜索“博文视点”或者扫描下方二维码并关注。 
                

Mycat 分布式事务的实现相关推荐

  1. MyCAT XA分布式事务

    为什么80%的码农都做不了架构师?>>>    1. 概述 数据库拆分后,业务上会碰到需要分布式事务的场景.MyCAT 基于 XA 实现分布式事务.国内目前另外一款很火的数据库中间件 ...

  2. 数据库中间件 MyCAT源码分析 —— XA分布式事务

    title: MyCAT 源码分析 -- XA分布式事务 date: 2017-07-15 tags: categories: MyCAT permalink: MyCAT/xa-distribute ...

  3. 日订单50万级分布式事务

    作者:伈情,喜玩Java.Python.Golang!热爱架构设计.SOA.微服务.高并发.分布式.性能优化.DevOps.大数据.消息队列等....!在互联网应用支撑系统&现金交易系统有些许 ...

  4. 分布式事务 TCC-Transaction 源码分析 —— 项目实战

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/TCC-Transaction/http-sample/ 「芋道 ...

  5. 聊聊微服务架构及分布式事务解决方案

    转载自   聊聊微服务架构及分布式事务解决方案 分布式事务场景如何设计系统架构及解决数据一致性问题,个人理解最终方案把握以下原则就可以了,那就是:大事务=小事务(原子事务)+异步(消息通知),解决分布 ...

  6. Mysql 扩展性设计之数据切分、那么数据切分后会带来哪些问题呢?比如分布式事务、数据的一致性、垂直切分和水平切分应用场景

    Mysql 扩展性设计之数据切分.那么数据切分后会带来哪些问题呢?比如分布式事务.数据的一致性.垂直切分和水平切分应用场景 前言.什么是数据切分 垂直(纵向)切分.水平(横向)切分.他们各自的特点 垂 ...

  7. 【分布式事务】内容较多CAP/BASE/2PC/3PC/TCC/Sega等等等等~,一次性捋清楚

    分布式事务 概述 什么是分布式事务 分布式事务就是指事务的资源分别位于分布式系统的不同节点之上的事务 分布式事务产生的原因 数据库分库分表 当业务数据量达到单库单表的极限时,就需要考虑分库分表,跨多个 ...

  8. SpringCloud微服务实战——搭建企业级开发框架(二十七):集成多数据源+Seata分布式事务+读写分离+分库分表

      读写分离:为了确保数据库产品的稳定性,很多数据库拥有双机热备功能.也就是,第一台数据库服务器,是对外提供增删改业务的生产服务器:第二台数据库服务器,主要进行读的操作.   目前有多种方式实现读写分 ...

  9. 分布式事务概述 (资料)

    2019独角兽企业重金招聘Python工程师标准>>> 什么是分布式系统 分布式系统是由一组通过网络进行通信.为了完成共同的任务而协调工作的计算机节点组成的系统.分布式系统的出现是为 ...

  10. 【总结】MyCat分布式数据库中间件

    1,数据库概述 在互联网时代,海量数据的存储与访问成为系统设计与使用的瓶颈问题,对于海量数据处理,按照使用场景,主要分为两种类型 联机事务处理(OLTP:On-line transaction pro ...

最新文章

  1. Java实战equals()与hashCode()
  2. 在Hadoop系统中运行WordCount案例失败解决方法
  3. 软件测试用python一般用来做什么-python软件测试
  4. MySQL的进阶实战篇
  5. 下载kolla_Kolla部署实验手册
  6. JMeter定制功能实现
  7. 求1 2 3 java_求1+2+3+...+n,Java代码实现
  8. 网络管理员在预先分配和识别作为_网络管理员必备流量分析工具,果断转发收藏!...
  9. 转发:关于数据权限设计的思考
  10. PGIS中java程序授权问题
  11. 完美卸载itunes
  12. 营业执照注册号是不是统一社会信用代码?
  13. 24岁我有了自己的公司
  14. 呼叫中心系统智能排队功能转接流程
  15. 404 jpeg图片_nginx中获取图片抛404错误
  16. 一款恋爱星座男女配对微信小程序源码
  17. IBM研究院院长:量子计算“大爆发”将在十年内到来
  18. 微信小程序开发工具npm用不了,报错“npm不是内部或外部命令,也不是可运行的程序”
  19. log4j 配置文件中设置相对路径
  20. android版本 51,51星变手机版

热门文章

  1. 自定义微信小程序导航(兼容各种手机)
  2. 网络操作系统第224页作业
  3. Part2-HttpClient官方教程-Chapter5-流利的API
  4. 北京理工计算机 上机复试2000年
  5. C++ enum类型的一个更好的用法
  6. E4/EAS/Eventing System 事件系统
  7. 从 Chrome 谈到 Adobe
  8. 认真去做,我会做得很棒!
  9. css相关笔记(一)
  10. 前端简单h5播放器的制作