在上一篇《java事务(二)——本地事务》中已经提到了事务的类型,并对本地事务做了说明。而分布式事务是跨越多个数据源来对数据来进行访问和更新,在JAVA中是使用JTA(Java Transaction API)来实现分布式的事务管理的。但是在本篇中并不会说明如何使用JTA,而是在不依赖其他框架以及jar包的情况下自己来实现分布式事务,作为对分布式事务的一个理解。

假设现在有两个数据库,可以是在一台机器上也可以是在不同机器上,现在要向其中一个数据库更新用户账户信息,另外一个数据库新增用户的消费信息。首先说明一下,分布式事务也是事务,在事务特性的那篇博客中就已经说明了事务的四个特性:原子性、一致性、隔离性和持久性,那么分布式事务也必然是符合这四个特性的,这就要求同时对两个数据库进行数据访问和更新的时候是作为一个单独的工作单元来进行处理,并且同时成功或者失败后进行回滚。但是在说明本地事务的时候已经提到了,本地事务是基于连接的,现在有两个数据库,分别保存数据,那么为了实现这个事务,必然会有两个数据库连接,这似乎是与事务基于连接的说法相悖。现在举个例子:之前回老家去了一趟医院,后来在办理出院手续的时候是这样的,办理出院时需要护士站的主任医生填写出院单,然后携带结账单到收费处缴纳费用并去药房取药,然后回护士站盖章,出院手续办理完毕。如果把不同地点的窗口看成是不同的连接,那么实现办理出院手续这个事务就必须保证在每个业务窗口上的事务都是成功的,最后出院手续才算真正完成。在最终盖章的时候,需要查看每个窗口给出的单子是否是已办理的,只有综合起来所有的单子才能判定出院手续是否成功。这主要就是为了说明分布式事务实现的关键其实是管理每个连接上的事务,用一个东西来判定每个连接上的事务执行情况,综合起来作为分布式事务执行成功与否的依据。这大概就是事务管理器要做的事情。虽然这个例子并不太恰当,很有挑毛病的地方,但是在不太钻牛角尖的情况下,还是可以用来说明要表达的东西的。

实现例子

我打开了两台虚拟机,分别命令为node1、node2,每台虚拟机上都安装了MySQL数据库,现在向node1上的数据库更新用户账户信息,向node2上的数据库新增用户消费信息。

在node1上创建账户表,建表语句如下:

CREATE TABLEACCOUNTS

(

IDINT NOT NULL AUTO_INCREMENT COMMENT '自增主键',

CUSTOMER_NOVARCHAR(25) NOT NULL COMMENT '客户号',

CUSTOMER_NAMEVARCHAR(25) NOT NULL COMMENT '客户名称',

CARD_IDVARCHAR(18) NOT NULL COMMENT '身份证号',

BANK_IDVARCHAR(25) NOT NULL COMMENT '开户行ID',

BALANCEDECIMAL NOT NULL COMMENT '账户余额',

CURRENCYVARCHAR(10) NOT NULL COMMENT '币种',PRIMARY KEY(ID)

)

COMMENT= '账户表' ;

然后向表中插入一条记录,如下图:

在node2上创建用户消费历史表,建表语句如下:

CREATE TABLEUSER_PURCHASE_HIS

(

IDINT NOT NULL AUTO_INCREMENT COMMENT '自增主键',

CUSTOMER_NOVARCHAR(25) NOT NULL COMMENT '客户号',

SERIAL_NOVARCHAR(32) NOT NULL COMMENT '交易流水号',

AMOUNTDECIMAL NOT NULL COMMENT '交易金额',

CURRENCYVARCHAR(10) NOT NULL COMMENT '币种',

REMARKVARCHAR(100) NOT NULL COMMENT '备注',PRIMARY KEY(ID)

)

COMMENT= '用户消费历史表';

下面实现一个简陋的例子,代码如下:

1、创建DBUtil类,用来获取和关闭连接

packageperson.lb.example1;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.sql.Statement;public classDBUtil {static{try{//加载驱动类

Class.forName("com.mysql.jdbc.Driver");

}catch(ClassNotFoundException e) {

e.printStackTrace();

}

}//获取node1上的数据库连接

public staticConnection getNode1Connection() {

Connection conn= null;try{

conn=(Connection) DriverManager.getConnection("jdbc:mysql://192.168.0.108:3306/TEST","root","root");

}catch(SQLException e) {

e.printStackTrace();

}returnconn;

}//获取node2上的数据库连接

public staticConnection getNode2Connection() {

Connection conn= null;try{

conn=(Connection) DriverManager.getConnection("jdbc:mysql://192.168.0.109:3306/TEST","root","root");

}catch(SQLException e) {

e.printStackTrace();

}returnconn;

}//关闭连接

public static voidclose(ResultSet rs, Statement st, Connection conn) {try{if(rs != null) {

rs.close();

}if(st != null) {

st.close();

}if(conn != null) {

conn.close();

}

}catch(SQLException e) {//TODO Auto-generated catch block

e.printStackTrace();

}

}

}

2、创建XADemo类,用来测试事务

packageperson.lb.example1;importjava.sql.Connection;importjava.sql.SQLException;importjava.sql.Statement;public classXADemo {public static voidmain(String[] args) {//获取连接

Connection node1Conn =DBUtil.getNode1Connection();

Connection node2Conn=DBUtil.getNode2Connection();try{//设置连接为非自动提交

node1Conn.setAutoCommit(false);

node2Conn.setAutoCommit(false);//更新账户信息

updateAccountInfo(node1Conn);//增加用户消费信息

addUserPurchaseInfo(node2Conn);//提交

node1Conn.commit();

node2Conn.commit();

}catch(SQLException e) {

e.printStackTrace();//回滚

try{

node1Conn.rollback();

node2Conn.rollback();

}catch(SQLException e1) {

e1.printStackTrace();

}

}finally{//关闭连接

DBUtil.close(null, null, node1Conn);

DBUtil.close(null, null, node2Conn);

}

}/*** 更新账户信息

*@paramconn

*@throwsSQLException*/

private static void updateAccountInfo(Connection conn) throwsSQLException {

Statement st=conn.createStatement();

st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");

}/*** 增加用户消费信息

*@paramconn

*@throwsSQLException*/

private static void addUserPurchaseInfo(Connection conn) throwsSQLException {

Statement st=conn.createStatement();

st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "

+ " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 100, 'CNY', '买衣服')");

}

}

这是一个没有发生任何异常的例子,执行结果是nod1上ACCOUNTS 表中的BALANCE字段的值成功更新为9900,而node2上USER_PURCHASE_HIS表中新增了一条记录,两个连接上的事务都成功完成,事务目标实现。如果反向测试一下,更改Insert语句,把其中某一个要插入的值改为NULL,由于字段都是非空限制,所以会发生异常,这个连接上的事务会失败,那么跟它关联的node1上的事务也必须回滚,不对数据库进行任何更改。经测试,结果与预期目标一致。说明这个例子是符合事务特性的。

但是这个例子不管是从代码的可读性和可维护性上来说都是比较差的。在使用spring开发项目的时候,配置了事务管理器以后,在我们的业务逻辑中几乎是察觉不到事务控制的,而且也看不到事务控制的代码。那么究竟spring中是怎么实现的事务控制呢,这篇博客中不会详细说明,但是要提到两个东西,事务管理器和资源管理器,现在自己来实现一个简单的事务管理器和资源管理器来对事务进行控制。

代码示例如下:

1、创建AbstractDataSource 类

packageperson.lb.datasource;importjava.sql.Connection;importjava.sql.SQLException;public abstract classAbstractDataSource {//获取连接

public abstract Connection getConnection() throwsSQLException ;//关闭连接

public abstract void close() throwsSQLException;

}

2、创建Node1DataSource 类,用来连接node1上的数据库

packageperson.lb.datasource;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.SQLException;public class Node1DataSource extendsAbstractDataSource {//使用ThreadLocal类保存当前线程使用的Connection

protected static final ThreadLocal threadSession = new ThreadLocal();static{try{//加载驱动类

Class.forName("com.mysql.jdbc.Driver");

}catch(ClassNotFoundException e) {

e.printStackTrace();

}

}private final static Node1DataSource node1DataSource = newNode1DataSource();privateNode1DataSource() {}public staticNode1DataSource getInstance() {returnnode1DataSource;

}/*** 获取连接*/@Overridepublic Connection getConnection() throwsSQLException {

Connection conn= null;if(threadSession.get() == null) {

conn=(Connection) DriverManager.getConnection("jdbc:mysql://192.168.0.108:3306/TEST","root","root");

threadSession.set(conn);

}else{

conn=threadSession.get();

}returnconn;

}/*** 关闭并移除连接*/@Overridepublic void close() throwsSQLException {

Connection conn=threadSession.get();if(conn != null) {

conn.close();

threadSession.remove();

}

}

}

3、创建Node2DataSource类,用来连接node2机器上的数据库

packageperson.lb.datasource;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.SQLException;public class Node2DataSource extendsAbstractDataSource {//使用ThreadLocal类保存当前线程使用的Connection

protected static final ThreadLocal threadSession = new ThreadLocal();static{try{//加载驱动类

Class.forName("com.mysql.jdbc.Driver");

}catch(ClassNotFoundException e) {

e.printStackTrace();

}

}private static final Node2DataSource node2DataSource = newNode2DataSource();privateNode2DataSource() {};public staticNode2DataSource getInstance() {returnnode2DataSource;

}/*** 获取连接*/@Overridepublic Connection getConnection() throwsSQLException {

Connection conn= null;if(threadSession.get() == null) {

conn=(Connection) DriverManager.getConnection("jdbc:mysql://192.168.0.109:3306/TEST","root","root");

threadSession.set(conn);

}else{

conn=threadSession.get();

}returnconn;

}/*** 关闭并移除连接*/@Overridepublic void close() throwsSQLException {

Connection conn=threadSession.get();if(conn != null) {

conn.close();

threadSession.remove();

}

}

}

4、创建Node1Dao类,在node1的数据库中更新账户信息

packageperson.lb.dao;importjava.sql.Connection;importjava.sql.SQLException;importjava.sql.Statement;importperson.lb.datasource.Node1DataSource;public classNode1Dao {private Node1DataSource dataSource =Node1DataSource.getInstance();/*** 更新账户信息

*@throwsSQLException*/

public void updateAccountInfo() throwsSQLException {

Connection conn=dataSource.getConnection();

Statement st=conn.createStatement();

st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");

}

}

5、创建Node2Dao,在node2机器上增加用户消费信息

packageperson.lb.dao;importjava.sql.Connection;importjava.sql.SQLException;importjava.sql.Statement;importperson.lb.datasource.Node2DataSource;public classNode2Dao {private Node2DataSource dataSource =Node2DataSource.getInstance();/*** 增加用户消费信息

*@throwsSQLException*/

public void addUserPurchaseInfo() throwsSQLException {

Connection conn=dataSource.getConnection();

Statement st=conn.createStatement();

st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "

+ " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', null, 'CNY', '买衣服')");

}

}

6、创建NodeService类,把两个操作作为一个事务来执行

packageperson.lb.service;importjava.sql.SQLException;importperson.lb.dao.Node1Dao;importperson.lb.dao.Node2Dao;importperson.lb.transaction.TransactionManager;public classNodeService {public voidexecute() {//启动事务

TransactionManager.begin();

Node1Dao node1Dao= newNode1Dao();

Node2Dao node2Dao= newNode2Dao();try{

node1Dao.updateAccountInfo();

node2Dao.addUserPurchaseInfo();//提交事务

TransactionManager.commit();

}catch(SQLException e) {

e.printStackTrace();

}

}

}

7、最后是测试类TestTx

packageperson.lb.test;importperson.lb.service.NodeService;public classTestTx {public static voidmain(String[] args) {

NodeService nodeService= newNodeService();

nodeService.execute();

}

}

经测试,与第一个例子效果一致,但是从代码上来说要比第一个例子的可读性和可维护性高。不过这个例子并不能说明分布式事务中的事务管理器和资源管理器的真正原理,也不是一个可使用的代码,毕竟存在缺陷,而且dao层需要抛出异常才能实现事务的回滚。我想,作为一个理解分布式事务的作用的例子是够了。

java mysql 分布式事务_java事务(三)——自己实现分布式事务相关推荐

  1. 分布式事务专题(三):分布式事务解决方案之2PC(两阶段提交)

    目录: 基础概念 分布式事务理论 分布式事务解决方案之2pc(本章) 分布式事务解决方案之TCC 分布式事务解决方案之可靠消息最终一致性 分布式事务解决方案之最大努力通知 分布式事务综合案例分析 3. ...

  2. java中mvc事务_java核心技术第五篇之事务和MVC模式

    第一部分:事务 1.事务的简介: 1.1 在一组操作中(比如增加操作,修改操作),只有增加和修改操作都成功之后,这两个操作才能真正的成功. ,如果这两个操作中,有一个失败了,这两个操作都失败了. 1. ...

  3. java+mysql性能优化_Java培训实战教程之mysql优化

    Java培训实战教程之mysql优化 更新时间:2015年12月29日13时30分 来源:传智播客Java培训学院 浏览次数: 1.   mysql引擎 1.1.  引擎类型 MySQL常用的存储引擎 ...

  4. java redis的同步_java同步系列之redis分布式锁进化史

    标题: 死磕 java同步系列之redis分布式锁进化史 - 彤哥读源码 - 博客园 转帖原地址: https://www.cnblogs.com/tong-yuan/p/11621361.html ...

  5. java zookeeper 使用场景_java架构之路-(分布式zookeeper)zookeeper真实使用场景

    上几次博客,我说了一下Zookeeper的简单使用和API的使用,我们接下来看一下他的真实场景. 一.分布式集群管理✨✨✨ 我们现在有这样一个需求,请先抛开Zookeeper是集群还是单机的概念,下面 ...

  6. java源代码实例倒计时_Java倒计时三种实现方式代码实例

    写完js倒计时,突然想用java实现倒计时,写了三种实现方式 一:设置时长的倒计时: 二:设置时间戳的倒计时: 三:使用java.util.Timer类实现的时间戳倒计时 代码如下: package ...

  7. java mysql重连_java mysql

    关于 java mysql的搜索结果 问题 连接mysql错误,Druid-ConnectionPool-Create-1641320886 16:52:01.163 [Druid-Connectio ...

  8. 分布式改造剧集三:Ehcache分布式改造

    第三集:分布式Ehcache缓存改造 前言 ​ 好久没有写博客了,大有半途而废的趋势.忙不是借口,这个好习惯还是要继续坚持.前面我承诺的第一期的DIY分布式,是时候上终篇了---DIY分布式缓存. 探 ...

  9. java mysql查询试题_java 面试题三十二 mysql查询面试题

    题一: 新建学生-课程数据库的三个表: 学生表:Student(Sno,Sname,Ssex,Sage,Sdept) Sno为主码; 课程表:Course(Cno,Cname,Cpno,Credeit ...

  10. java mysql 行锁_Java如何实现对Mysql数据库的行锁?

    行锁 mysql实现行级锁的两大前提就是,innodb引擎并且开启事务.由于MySQL/InnoDB的加锁分析,一般日常中使用方式为: select .... from table where ... ...

最新文章

  1. SetAutoResizeMode
  2. 测试购买到的LMV358 DIP-8封装的芯片特性
  3. OpenCV 色彩空间的改变
  4. 1、CSS样式及其基本语法
  5. listview嵌套gridview
  6. 计算机基础及ms应用在线,全国一级计算机基础及MS Office应用课件 (2).pdf
  7. 威海二职工业机器人专业_工业机器人专业就业前景-山东省好的中专学校
  8. LeetCode 1743. 从相邻元素对还原数组(拓扑排序)
  9. php里面的MySql
  10. Python代码优化之in关键字
  11. 合伙和合作的区别是什么?
  12. c语言 期末,c语言期末 求助
  13. 凤凰系统基于android x x86,凤凰系统(Phoenix OS)x86版1.0 beta官方版
  14. C#获取登录验证码图片
  15. Mac系统中键盘失灵后的解决小技巧
  16. python输入一个正整数_Python题目解答:输入一个整数N,输出N的所有最小因子
  17. 如何将Excel中的表格直接转成LaTex格式?
  18. 关于EMC DAE、DPE、SPE、SPS的解释
  19. linux 关闭zombie进程
  20. 看b站学习Android studio的第一天

热门文章

  1. 微信小程序封装api请求步骤
  2. 赋能电力能源数字化转型 新华三打算怎么做?
  3. 从用户连续活跃的最大天数说起
  4. 内网渗透|红日安全团队靶场渗透笔记|Mimikatz|MSF跳板
  5. P/Invoke能够在asp.net 中使用哦
  6. 百度抢先翻开春节红包大战B面
  7. Dev-C++软件安装教程
  8. 东方联盟发现SolarWinds黑客使用的3种新恶意软件
  9. 前端常用的 59 个工具类【持续更新】
  10. 董事长职责范围是什么