2019独角兽企业重金招聘Python工程师标准>>>

BASE Transaction

  • Best efforts delivery transaction (已经实现).
  • Try confirm cancel transaction (待定).

Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。

最大努力送达型事务

在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。

最大努力送达型事务的架构图

最大努力送达型事务的架构图

摘自sharding-jdbc使用指南☞事务支持

执行过程有以下几种情况:

  1. 执行成功--如图所示,执行结果事件->监听执行事件->执行成功->清理事务日志
  2. 执行失败,同步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行成功->清理事务日志
  3. 执行失败,同步重试失败,异步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行成功->清理事务日志
  4. 执行失败,同步重试失败,异步重试失败,事务日志保留----如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行失败->... ...

说明:不管执行结果如何,执行前事件都会记录事务日志;执行事件类型包括3种:BEFORE_EXECUTEEXECUTE_FAILUREEXECUTE_SUCCESS;另外,这里的"同步"不是绝对的同步执行,而是通过google-guava的EventBus发布事件后,在监听端判断是EXECUTE_FAILURE事件,最多重试syncMaxDeliveryTryTimes次;后面对BestEffortsDeliveryListener的源码分析有介绍;这里的"异步"通过外挂实现,在后面的文章10. sharding-jdbc源码之异步送达JOB会有分析;

适用场景

  • 根据主键删除数据。
  • 更新记录永久状态,如更新通知送达状态。

使用限制

  • 使用最大努力送达型柔性事务的SQL需要满足幂等性。
  • INSERT语句要求必须包含主键,且不能是自增主键。
  • UPDATE语句要求幂等,不能是UPDATE xxx SET x=x+1
  • DELETE语句无要求。

开发示例

// 1\. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
// 配置相关请看后面的备注
transactionConfig.setXXX();// 2\. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();// 3\. 获取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);// 4\. 开启事务
transaction.begin(connection);// 5\. 执行JDBC
/* code here
*/
*
// 6.关闭事务
transaction.end();

备注:SoftTransactionConfiguration支持的配置以及含义请参考sharding-jdbc使用指南☞事务支持,这段开发示例的代码也摘自这里;也可参考sharding-jdbc-transaction模块中com.dangdang.ddframe.rdb.transaction.soft.integrate.SoftTransactionTest如何使用柔性事务,但是这里的代码需要稍作修改,否则只是普通的执行逻辑,不是sharding-jdbc的执行逻辑

@Test
public void bedSoftTransactionTest() throws SQLException {SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));// 初始化柔性事务管理器transactionManagerFactory.init();BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);transactionManager.begin(getShardingDataSource().getConnection());// 执行INSERT SQL(DML类型),如果执行过程中异常,会在`BestEffortsDeliveryListener`中重试insert();transactionManager.end();
}private void insert() {String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";try (// 将.getConnection("db_trans", SQLType.DML)移除,这样的话,得到的才是ShardingConnection Connection conn = getShardingDataSource().getConnection();PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {preparedStatement.setString(1, "JUST TEST IT .");preparedStatement.executeUpdate();} catch (final SQLException e) {e.printStackTrace();}
}

核心源码分析

通过3. sharding-jdbc源码之路由&执行中对ExecutorEngine的分析可知,sharding-jdbc在执行SQL前后,分别调用EventBusInstance.getInstance().post()提交了事件,那么调用EventBusInstance.getInstance().register()的地方,就是柔性事务处理的地方,通过查看源码的调用关系可知,只有SoftTransactionManager.init()调用了EventBusInstance.getInstance().register(),所以柔性事务实现的核心在SoftTransactionManager这里;

柔性事务管理器

柔性事务实现在SoftTransactionManager中,核心源码如下:

public final class SoftTransactionManager {// 柔性事务配置对象 @Getterprivate final SoftTransactionConfiguration transactionConfig;/*** Initialize B.A.S.E transaction manager.* @throws SQLException SQL exception*/public void init() throws SQLException {// 初始化注册最大努力送达型柔性事务监听器;EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {// 如果事务日志数据源类型是关系型数据库,则创建事务日志表transaction_logcreateTable();}// 内嵌的最大努力送达型异步JOB任务,依赖当当开源的elastic-jobif (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();}}// 从这里可知创建的事务日志表表名是transaction_log(所以需要保证每个库中用户没有自定义创建transaction_log表)private void createTable() throws SQLException {String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("+ "`id` VARCHAR(40) NOT NULL, "+ "`transaction_type` VARCHAR(30) NOT NULL, "+ "`data_source` VARCHAR(255) NOT NULL, "+ "`sql` TEXT NOT NULL, "+ "`parameters` TEXT NOT NULL, "+ "`creation_time` LONG NOT NULL, "+ "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "+ "PRIMARY KEY (`id`));";try (Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {preparedStatement.executeUpdate();}}

从这段源码可知,柔性事务的几个重点如下,接下来一一根据源码进行分析;

  • 事务日志存储器;
  • 最大努力送达型事务监听器;
  • 异步送达JOB任务;

1.事务日志存储器

柔性事务日志接口类为TransactionLogStorage.java,有两个实现类:

  1. RdbTransactionLogStorage:关系型数据库存储柔性事务日志;
  2. MemoryTransactionLogStorage:内存存储柔性事务日志;

1.1.1事务日志核心接口

TransactionLogStorage中几个重要接口在两个实现类中的实现:

  • void add(TransactionLog):Rdb实现就是把事务日志TransactionLog 插入到transaction_log表中,Memory实现就是把事务日志保存到ConcurrentHashMap中;
  • void remove(String id):Rdb实现就是从transaction_log表中删除事务日志,Memory实现从ConcurrentHashMap中删除事务日志;
  • void increaseAsyncDeliveryTryTimes(String id):异步增加送达重试次数,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb实现就是update transaction_log表中async_delivery_try_times字段加1;Memory实现就是TransactionLog中重新给asyncDeliveryTryTimes赋值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()
  • findEligibleTransactionLogs(): 查询需要处理的事务日志,条件是:①异步处理次数async_delivery_try_times小于参数最大处里次数maxDeliveryTryTimes,②transaction_type是BestEffortsDelivery,③系统当前时间与事务日志的创建时间差要超过参数maxDeliveryTryDelayMillis,每次最多查询参数size条;Rdb实现通过sql从transaction_log表中查询,Memory实现遍历ConcurrentHashMap匹配符合条件的TransactionLog;
  • boolean processData():Rdb实现执行TransactionLog中的sql,如果执行过程中抛出异常,那么调用increaseAsyncDeliveryTryTimes()增加送达重试次数并抛出异常,如果执行成功,删除事务日志,并返回true;Memory实现直接返回false(因为processData()的目的是执行TransactionLog中的sql,而Memory类型无法触及数据库,所以返回false)

1.1.2事务日志存储核心源码

RdbTransactionLogStorage.java实现源码:

public final class RdbTransactionLogStorage implements TransactionLogStorage {private final DataSource dataSource;@Overridepublic void add(final TransactionLog transactionLog) {// 保存事务日志到rdb中的sqlString sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";try (Connection conn = dataSource.getConnection();PreparedStatement preparedStatement = conn.prepareStatement(sql)) {... ...preparedStatement.executeUpdate();} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic void remove(final String id) {// 根据id删除事务日志的sqlString sql = "DELETE FROM `transaction_log` WHERE `id`=?;";try (Connection conn = dataSource.getConnection();PreparedStatement preparedStatement = conn.prepareStatement(sql)) {preparedStatement.setString(1, id);preparedStatement.executeUpdate();} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic List findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {List result = new ArrayList(size);// 执行该sql查询需要处理的事务日志,最多取size条;String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";try (Connection conn = dataSource.getConnection()) {try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {... ...preparedStatement.setLong(3, System.currentTimeMillis() - maxDeliveryTryDelayMillis);... ...}} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}return result;}@Overridepublic void increaseAsyncDeliveryTryTimes(final String id) {// 更新处理次数+1String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";try (... ...} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {try (Connection conn = connection;// 执行TransactionLog中的sqlPreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));}preparedStatement.executeUpdate();} catch (final SQLException ex) {如果抛出异常,表示执行sql失败,那么把增加处理次数并把异常抛出去;increaseAsyncDeliveryTryTimes(transactionLog.getId());throw new TransactionCompensationException(ex);}// 如果没有抛出异常,表示执行sql成功,那么删除该事务日志;remove(transactionLog.getId());return true;}
}

1.1.3事务日志存储样例

id transction_type data_source sql parameters creation_time async_delivery_try_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

transaction_log中存储的事务日志样例:

id transction_type data_source sql parameters creation_time async_delivery_try_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

1.2最大努力送达型事务监听器

核心源码如下:

```
/**
* Best efforts delivery B.A.S.E transaction listener.
*
* @author zhangliang
*/
@Slf4j
public final class BestEffortsDeliveryListener {

@Subscribe
@AllowConcurrentEvents
// 从方法可知,只监听DML执行事件(DML即数据维护语言,包括INSERT, UPDATE, DELETE)
public void listen(final DMLExecutionEvent event) {// 判断是否需要继续,判断逻辑为:事务存在,并且是BestEffortsDelivery类型事务if (!isProcessContinuously()) {return;}// 从柔性事务管理器中得到柔性事务配置SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();// 得到配置的柔性事务存储器TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());// 这里肯定是最大努力送达型事务(如果不是BEDSoftTransaction,isProcessContinuously()就是false)BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();// 根据事件类型做不同处理switch (event.getEventExecutionType()) {case BEFORE_EXECUTE:// 如果执行前事件,那么先保存事务日志;//TODO for batch SQL need split to 2-level recordstransactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));return;case EXECUTE_SUCCESS: // 如果执行成功事件,那么删除事务日志;transactionLogStorage.remove(event.getId());return;case EXECUTE_FAILURE: boolean deliverySuccess = false;// 如果执行成功事件,最大努力送达型最多尝试3次(可配置,SoftTransactionConfiguration中的参数syncMaxDeliveryTryTimes);for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {// 如果在该Listener中执行成功,那么返回,不需要再尝试if (deliverySuccess) {return;}boolean isNewConnection = false;Connection conn = null;PreparedStatement preparedStatement = null;try {conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);// 通过执行"select 1"判断conn是否是有效的数据库连接;如果不是有效的数据库连接,释放掉并重新获取一个数据库连接;if (!isValidConnection(conn)) {bedSoftTransaction.getConnection().release(conn);conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);isNewConnection = true;}preparedStatement = conn.prepareStatement(event.getSql());//TODO for batch event need split to 2-level recordsfor (int parameterIndex = 0; parameterIndex  BestEffortsDeliveryListener源码总结:
  • 执行前,插入事务日志;
  • 执行成功,则删除事务日志;
  • 执行失败,则最大努力尝试syncMaxDeliveryTryTimes次;

1.3 异步送达JOB任务

  • 部署用于存储事务日志的数据库。
  • 部署用于异步作业使用的zookeeper。
  • 配置YAML文件,参照示例文件config.yaml。
  • 下载并解压文件sharding-jdbc-transaction-async-job-$VERSION.tar,通过start.sh脚本启动异步作业。

异步送达JOB任务基于elastic-job,所以需要部署zookeeper;

http://cmsblogs.com/?p=2542

转载于:https://my.oschina.net/xiaominmin/blog/1825144

【死磕Sharding-jdbc】—–最大努力型事务相关推荐

  1. shardingjdbc (九)-最大努力型事务

    一 序: Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务.目前支持的: Best efforts delivery transaction (已经实现). Try conf ...

  2. 我说分布式事务之最大努力通知型事务

    来源:http://t.cn/E4ejkSN 在之前的文章中,我们介绍了基于TCC模式的分布式事务解决方案 我说分布式事务之TCC . TCC适用于公司内部对一致性.实时性要求较高的业务场景,而本文我 ...

  3. 一条mysql语句是事务吗_没想到!我在简历上写了“精通MySQL”,阿里面试官跟我死磕后就给我发了高薪offer...

    事情是这样的 前段时间面试了阿里,大家也都清楚,如果你在简历上面写着你精通XX技术,那面试官就会跟你死磕到底. 我就是在自己的简历上写了精通MySQL,然后就开启了和阿里面试官的死磕之路,结果就是拿到 ...

  4. 在微信进行多人协作编辑Excel型文件,又何必和Excel死磕呢

    在微信进行在线多人编辑,不必和Excel死磕,用vika维格表也能达到一样的效果. 这个问题的本质其实不是必须用Excel这个工具来编辑,假如有一个支持多人编辑的工具能够满足我们需要的Excel的功能 ...

  5. 死磕java并发cas_死磕 java并发包之AtomicInteger源码分析

    问题 (1)什么是原子操作? (2)原子操作和数据库的ACID有啥关系? (3)AtomicInteger是怎么实现原子操作的? (4)AtomicInteger是有什么缺点? 简介 AtomicIn ...

  6. Spring boot + Sharding JDBC 分库分表 及 分布式事务处理

    Sharding JDBC 基础概念 Apache ShardingSphere 是一套开源的分布式数据库解决方案组成的生态圈,它由 JDBC.Proxy 和 Sidecar(规划中)这 3 款既能够 ...

  7. 三个月死磕Python是种什么样的体验?

    关注「实验楼」,每天分享一个项目教程 3个月的死磕Python后,参加「 楼+ Python实战 · 第4期 」的学员们感想如何?下面带来他们的真实评价. 作为实验楼的网红课程--「 楼+ Pytho ...

  8. UCloud,创业公司死磕公有云的悲壮

    "[报名]数据猿年度精彩活动推荐:访谈调研+企业盘点+榜奖峰会,与数据猿共筑2021 大数据产业创新服务媒体 --聚焦数据 · 改变商业 作为中国云计算领域第一家科创板上市公司,UCloud ...

  9. “倚天”一出,谁与争锋?阿里发布首颗云芯片倚天 710,死磕自研芯

    作者 | 贾凯强.伍杏玲 出品 | CSDN 10 月 19 日,2021 年云栖大会正式拉开帷幕.达摩院院长.阿里云智能事业部总裁张建锋表示,如今一个以云为核心的新型计算体系结构正在形成,该体系从三 ...

最新文章

  1. python优雅写法
  2. java public object_Java中Object类
  3. 公钥、私钥、数字证书的概念 (讲得很明吧,通俗易懂)
  4. 汉王考勤管理软件mysql数据库配置_汉王考勤管理软件使用说明书介绍.pdf
  5. Unity编辑器开发之中文名称转拼音
  6. 上周热点回顾(4.9-4.15)
  7. 今天教大家怎么用Unity制作简单的AR
  8. Xshell 连接服务器失败的解决方法
  9. 粗暴的rm rf,报错Argument list too long
  10. 计算机平时测试零分,计算机二级最全攻略 就快考试了不看等什么呢!
  11. elementui级联选择器Cascader不触发change事件
  12. 网易云易盾推出面向微信小程序的大数据反作弊产品
  13. 软考哪个证书最有用?
  14. pwn基本ROP——ret2libc
  15. 计算机保研夏令营准备流程建议
  16. 【图像处理通道分离去除印章】
  17. js ascii码使用攻略
  18. 设计菲涅尔透镜(python)
  19. 《图解机器学习-杉山将著》读书笔记---CH2
  20. 昆仑通态MCGS解摸屏与变频器通信程序

热门文章

  1. python适合零基础学习吗-零基础能学好Python吗?哪些人更适合学习?
  2. 零基础python入门课程-零基础 Python 入门
  3. python3下载教程-Python3完全零基础入门精讲 全套视频教程
  4. python上海培训哪里比较好-上海哪个python培训机构好
  5. Linux拷贝排除一个或多个目录的实现方法
  6. 关于ECS设计以及MVC分层设计和组件化设计的思考和总结(这个标题就问你长不长)
  7. 成都理工大学计算机报告,[2017年整理]成都理工大学通信工程计算机网络综合课程设计报告.doc...
  8. Github上的十大机器学习项目
  9. java设计模式:Builder模式
  10. Activity的用法(三):开启网页 (没有弹出浏览器,不清楚是什么原因)