【死磕Sharding-jdbc】—–最大努力型事务
2019独角兽企业重金招聘Python工程师标准>>>
BASE Transaction
- Best efforts delivery transaction (已经实现).
- Try confirm cancel transaction (待定).
Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。
最大努力送达型事务
在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。
最大努力送达型事务的架构图
最大努力送达型事务的架构图
摘自sharding-jdbc使用指南☞事务支持
执行过程有以下几种情况:
- 执行成功--如图所示,执行结果事件->监听执行事件->执行成功->清理事务日志
- 执行失败,同步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行成功->清理事务日志
- 执行失败,同步重试失败,异步重试成功--如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行成功->清理事务日志
- 执行失败,同步重试失败,异步重试失败,事务日志保留----如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行失败->... ...
说明:不管执行结果如何,执行前事件都会记录事务日志;执行事件类型包括3种:BEFORE_EXECUTE,EXECUTE_FAILURE和EXECUTE_SUCCESS;另外,这里的"同步"不是绝对的同步执行,而是通过google-guava的EventBus发布事件后,在监听端判断是EXECUTE_FAILURE事件,最多重试
syncMaxDeliveryTryTimes
次;后面对BestEffortsDeliveryListener
的源码分析有介绍;这里的"异步"通过外挂实现,在后面的文章10. sharding-jdbc源码之异步送达JOB会有分析;
适用场景
- 根据主键删除数据。
- 更新记录永久状态,如更新通知送达状态。
使用限制
- 使用最大努力送达型柔性事务的SQL需要满足幂等性。
- INSERT语句要求必须包含主键,且不能是自增主键。
- UPDATE语句要求幂等,不能是UPDATE xxx SET x=x+1
- DELETE语句无要求。
开发示例
// 1\. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
// 配置相关请看后面的备注
transactionConfig.setXXX();// 2\. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();// 3\. 获取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);// 4\. 开启事务
transaction.begin(connection);// 5\. 执行JDBC
/* code here
*/
*
// 6.关闭事务
transaction.end();
备注:SoftTransactionConfiguration支持的配置以及含义请参考sharding-jdbc使用指南☞事务支持,这段开发示例的代码也摘自这里;也可参考
sharding-jdbc-transaction
模块中com.dangdang.ddframe.rdb.transaction.soft.integrate.SoftTransactionTest
如何使用柔性事务,但是这里的代码需要稍作修改,否则只是普通的执行逻辑,不是sharding-jdbc的执行逻辑:
@Test
public void bedSoftTransactionTest() throws SQLException {SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));// 初始化柔性事务管理器transactionManagerFactory.init();BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);transactionManager.begin(getShardingDataSource().getConnection());// 执行INSERT SQL(DML类型),如果执行过程中异常,会在`BestEffortsDeliveryListener`中重试insert();transactionManager.end();
}private void insert() {String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";try (// 将.getConnection("db_trans", SQLType.DML)移除,这样的话,得到的才是ShardingConnection Connection conn = getShardingDataSource().getConnection();PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {preparedStatement.setString(1, "JUST TEST IT .");preparedStatement.executeUpdate();} catch (final SQLException e) {e.printStackTrace();}
}
核心源码分析
通过3. sharding-jdbc源码之路由&执行中对ExecutorEngine的分析可知,sharding-jdbc在执行SQL前后,分别调用EventBusInstance.getInstance().post()
提交了事件,那么调用EventBusInstance.getInstance().register()
的地方,就是柔性事务处理的地方,通过查看源码的调用关系可知,只有SoftTransactionManager.init()
调用了EventBusInstance.getInstance().register()
,所以柔性事务实现的核心在SoftTransactionManager这里;
柔性事务管理器
柔性事务实现在SoftTransactionManager
中,核心源码如下:
public final class SoftTransactionManager {// 柔性事务配置对象 @Getterprivate final SoftTransactionConfiguration transactionConfig;/*** Initialize B.A.S.E transaction manager.* @throws SQLException SQL exception*/public void init() throws SQLException {// 初始化注册最大努力送达型柔性事务监听器;EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {// 如果事务日志数据源类型是关系型数据库,则创建事务日志表transaction_logcreateTable();}// 内嵌的最大努力送达型异步JOB任务,依赖当当开源的elastic-jobif (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();}}// 从这里可知创建的事务日志表表名是transaction_log(所以需要保证每个库中用户没有自定义创建transaction_log表)private void createTable() throws SQLException {String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("+ "`id` VARCHAR(40) NOT NULL, "+ "`transaction_type` VARCHAR(30) NOT NULL, "+ "`data_source` VARCHAR(255) NOT NULL, "+ "`sql` TEXT NOT NULL, "+ "`parameters` TEXT NOT NULL, "+ "`creation_time` LONG NOT NULL, "+ "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "+ "PRIMARY KEY (`id`));";try (Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {preparedStatement.executeUpdate();}}
从这段源码可知,柔性事务的几个重点如下,接下来一一根据源码进行分析;
- 事务日志存储器;
- 最大努力送达型事务监听器;
- 异步送达JOB任务;
1.事务日志存储器
柔性事务日志接口类为TransactionLogStorage.java
,有两个实现类:
- RdbTransactionLogStorage:关系型数据库存储柔性事务日志;
- MemoryTransactionLogStorage:内存存储柔性事务日志;
1.1.1事务日志核心接口
TransactionLogStorage中几个重要接口在两个实现类中的实现:
- void add(TransactionLog):Rdb实现就是把事务日志TransactionLog 插入到
transaction_log
表中,Memory实现就是把事务日志保存到ConcurrentHashMap
中; - void remove(String id):Rdb实现就是从
transaction_log
表中删除事务日志,Memory实现从ConcurrentHashMap
中删除事务日志; - void increaseAsyncDeliveryTryTimes(String id):异步增加送达重试次数,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb实现就是update
transaction_log
表中async_delivery_try_times
字段加1;Memory实现就是TransactionLog中重新给asyncDeliveryTryTimes赋值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()
; - findEligibleTransactionLogs(): 查询需要处理的事务日志,条件是:①
异步处理次数async_delivery_try_times小于参数最大处里次数maxDeliveryTryTimes
,②transaction_type是BestEffortsDelivery
,③系统当前时间与事务日志的创建时间差要超过参数maxDeliveryTryDelayMillis
,每次最多查询参数size条;Rdb实现通过sql从transaction_log表中查询,Memory实现遍历ConcurrentHashMap匹配符合条件的TransactionLog; - boolean processData():Rdb实现执行TransactionLog中的sql,如果执行过程中抛出异常,那么调用increaseAsyncDeliveryTryTimes()增加送达重试次数并抛出异常,如果执行成功,删除事务日志,并返回true;Memory实现直接返回false(因为processData()的目的是执行TransactionLog中的sql,而Memory类型无法触及数据库,所以返回false)
1.1.2事务日志存储核心源码
RdbTransactionLogStorage.java
实现源码:
public final class RdbTransactionLogStorage implements TransactionLogStorage {private final DataSource dataSource;@Overridepublic void add(final TransactionLog transactionLog) {// 保存事务日志到rdb中的sqlString sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";try (Connection conn = dataSource.getConnection();PreparedStatement preparedStatement = conn.prepareStatement(sql)) {... ...preparedStatement.executeUpdate();} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic void remove(final String id) {// 根据id删除事务日志的sqlString sql = "DELETE FROM `transaction_log` WHERE `id`=?;";try (Connection conn = dataSource.getConnection();PreparedStatement preparedStatement = conn.prepareStatement(sql)) {preparedStatement.setString(1, id);preparedStatement.executeUpdate();} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic List findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {List result = new ArrayList(size);// 执行该sql查询需要处理的事务日志,最多取size条;String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";try (Connection conn = dataSource.getConnection()) {try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {... ...preparedStatement.setLong(3, System.currentTimeMillis() - maxDeliveryTryDelayMillis);... ...}} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}return result;}@Overridepublic void increaseAsyncDeliveryTryTimes(final String id) {// 更新处理次数+1String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";try (... ...} catch (final SQLException ex) {throw new TransactionLogStorageException(ex);}}@Overridepublic boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {try (Connection conn = connection;// 执行TransactionLog中的sqlPreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));}preparedStatement.executeUpdate();} catch (final SQLException ex) {如果抛出异常,表示执行sql失败,那么把增加处理次数并把异常抛出去;increaseAsyncDeliveryTryTimes(transactionLog.getId());throw new TransactionCompensationException(ex);}// 如果没有抛出异常,表示执行sql成功,那么删除该事务日志;remove(transactionLog.getId());return true;}
}
1.1.3事务日志存储样例
id | transction_type | data_source | sql | parameters | creation_time | async_delivery_try_times |
---|---|---|---|---|---|---|
85c141c4-1b8f-4e54-9010-0cc661bb1864 | BestEffortsDelivery | db_trans | insert into transaction_test(id, remark) values (3, ?) | ["TEST BY AFEI."] | 1517899200989 | 0 |
transaction_log中存储的事务日志样例:
id | transction_type | data_source | sql | parameters | creation_time | async_delivery_try_times |
---|---|---|---|---|---|---|
85c141c4-1b8f-4e54-9010-0cc661bb1864 | BestEffortsDelivery | db_trans | insert into transaction_test(id, remark) values (3, ?) | ["TEST BY AFEI."] | 1517899200989 | 0 |
1.2最大努力送达型事务监听器
核心源码如下:
```
/**
* Best efforts delivery B.A.S.E transaction listener.
*
* @author zhangliang
*/
@Slf4j
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
// 从方法可知,只监听DML执行事件(DML即数据维护语言,包括INSERT, UPDATE, DELETE)
public void listen(final DMLExecutionEvent event) {// 判断是否需要继续,判断逻辑为:事务存在,并且是BestEffortsDelivery类型事务if (!isProcessContinuously()) {return;}// 从柔性事务管理器中得到柔性事务配置SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();// 得到配置的柔性事务存储器TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());// 这里肯定是最大努力送达型事务(如果不是BEDSoftTransaction,isProcessContinuously()就是false)BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();// 根据事件类型做不同处理switch (event.getEventExecutionType()) {case BEFORE_EXECUTE:// 如果执行前事件,那么先保存事务日志;//TODO for batch SQL need split to 2-level recordstransactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));return;case EXECUTE_SUCCESS: // 如果执行成功事件,那么删除事务日志;transactionLogStorage.remove(event.getId());return;case EXECUTE_FAILURE: boolean deliverySuccess = false;// 如果执行成功事件,最大努力送达型最多尝试3次(可配置,SoftTransactionConfiguration中的参数syncMaxDeliveryTryTimes);for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {// 如果在该Listener中执行成功,那么返回,不需要再尝试if (deliverySuccess) {return;}boolean isNewConnection = false;Connection conn = null;PreparedStatement preparedStatement = null;try {conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);// 通过执行"select 1"判断conn是否是有效的数据库连接;如果不是有效的数据库连接,释放掉并重新获取一个数据库连接;if (!isValidConnection(conn)) {bedSoftTransaction.getConnection().release(conn);conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);isNewConnection = true;}preparedStatement = conn.prepareStatement(event.getSql());//TODO for batch event need split to 2-level recordsfor (int parameterIndex = 0; parameterIndex BestEffortsDeliveryListener源码总结:
- 执行前,插入事务日志;
- 执行成功,则删除事务日志;
- 执行失败,则最大努力尝试
syncMaxDeliveryTryTimes
次;
1.3 异步送达JOB任务
- 部署用于存储事务日志的数据库。
- 部署用于异步作业使用的zookeeper。
- 配置YAML文件,参照示例文件config.yaml。
- 下载并解压文件sharding-jdbc-transaction-async-job-$VERSION.tar,通过start.sh脚本启动异步作业。
异步送达JOB任务基于elastic-job,所以需要部署zookeeper;
http://cmsblogs.com/?p=2542
转载于:https://my.oschina.net/xiaominmin/blog/1825144
【死磕Sharding-jdbc】—–最大努力型事务相关推荐
- shardingjdbc (九)-最大努力型事务
一 序: Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务.目前支持的: Best efforts delivery transaction (已经实现). Try conf ...
- 我说分布式事务之最大努力通知型事务
来源:http://t.cn/E4ejkSN 在之前的文章中,我们介绍了基于TCC模式的分布式事务解决方案 我说分布式事务之TCC . TCC适用于公司内部对一致性.实时性要求较高的业务场景,而本文我 ...
- 一条mysql语句是事务吗_没想到!我在简历上写了“精通MySQL”,阿里面试官跟我死磕后就给我发了高薪offer...
事情是这样的 前段时间面试了阿里,大家也都清楚,如果你在简历上面写着你精通XX技术,那面试官就会跟你死磕到底. 我就是在自己的简历上写了精通MySQL,然后就开启了和阿里面试官的死磕之路,结果就是拿到 ...
- 在微信进行多人协作编辑Excel型文件,又何必和Excel死磕呢
在微信进行在线多人编辑,不必和Excel死磕,用vika维格表也能达到一样的效果. 这个问题的本质其实不是必须用Excel这个工具来编辑,假如有一个支持多人编辑的工具能够满足我们需要的Excel的功能 ...
- 死磕java并发cas_死磕 java并发包之AtomicInteger源码分析
问题 (1)什么是原子操作? (2)原子操作和数据库的ACID有啥关系? (3)AtomicInteger是怎么实现原子操作的? (4)AtomicInteger是有什么缺点? 简介 AtomicIn ...
- Spring boot + Sharding JDBC 分库分表 及 分布式事务处理
Sharding JDBC 基础概念 Apache ShardingSphere 是一套开源的分布式数据库解决方案组成的生态圈,它由 JDBC.Proxy 和 Sidecar(规划中)这 3 款既能够 ...
- 三个月死磕Python是种什么样的体验?
关注「实验楼」,每天分享一个项目教程 3个月的死磕Python后,参加「 楼+ Python实战 · 第4期 」的学员们感想如何?下面带来他们的真实评价. 作为实验楼的网红课程--「 楼+ Pytho ...
- UCloud,创业公司死磕公有云的悲壮
"[报名]数据猿年度精彩活动推荐:访谈调研+企业盘点+榜奖峰会,与数据猿共筑2021 大数据产业创新服务媒体 --聚焦数据 · 改变商业 作为中国云计算领域第一家科创板上市公司,UCloud ...
- “倚天”一出,谁与争锋?阿里发布首颗云芯片倚天 710,死磕自研芯
作者 | 贾凯强.伍杏玲 出品 | CSDN 10 月 19 日,2021 年云栖大会正式拉开帷幕.达摩院院长.阿里云智能事业部总裁张建锋表示,如今一个以云为核心的新型计算体系结构正在形成,该体系从三 ...
最新文章
- python优雅写法
- java public object_Java中Object类
- 公钥、私钥、数字证书的概念 (讲得很明吧,通俗易懂)
- 汉王考勤管理软件mysql数据库配置_汉王考勤管理软件使用说明书介绍.pdf
- Unity编辑器开发之中文名称转拼音
- 上周热点回顾(4.9-4.15)
- 今天教大家怎么用Unity制作简单的AR
- Xshell 连接服务器失败的解决方法
- 粗暴的rm rf,报错Argument list too long
- 计算机平时测试零分,计算机二级最全攻略 就快考试了不看等什么呢!
- elementui级联选择器Cascader不触发change事件
- 网易云易盾推出面向微信小程序的大数据反作弊产品
- 软考哪个证书最有用?
- pwn基本ROP——ret2libc
- 计算机保研夏令营准备流程建议
- 【图像处理通道分离去除印章】
- js ascii码使用攻略
- 设计菲涅尔透镜(python)
- 《图解机器学习-杉山将著》读书笔记---CH2
- 昆仑通态MCGS解摸屏与变频器通信程序
热门文章
- python适合零基础学习吗-零基础能学好Python吗?哪些人更适合学习?
- 零基础python入门课程-零基础 Python 入门
- python3下载教程-Python3完全零基础入门精讲 全套视频教程
- python上海培训哪里比较好-上海哪个python培训机构好
- Linux拷贝排除一个或多个目录的实现方法
- 关于ECS设计以及MVC分层设计和组件化设计的思考和总结(这个标题就问你长不长)
- 成都理工大学计算机报告,[2017年整理]成都理工大学通信工程计算机网络综合课程设计报告.doc...
- Github上的十大机器学习项目
- java设计模式:Builder模式
- Activity的用法(三):开启网页 (没有弹出浏览器,不清楚是什么原因)