目录

  • 前置
  • InsertBatchSuccessServiceImpl.java
  • InsertBatchErrorServiceImpl.java
  • 效果图

前置

在一些特殊的场景下, 我们需要一些特定的操作.
比如我有一个接口, 做如下操作, 需要保持事物的一致性, 即: 全部成功则提交, 一个异常则全部回滚:
1.insert订单、(耗时1秒)
2.insert订单商品、(耗时1秒)
3.insert子订单、(耗时1秒)
4.insert操作记录、(耗时1秒)
在这波insert操作下来, 就需要花费4秒钟, 那么我们是否可以采用异步的方式进行保存, 将时间保持在1秒钟, 并保持事物一致性
故有了下面的方法

项目地址: https://gitee.com/xmaxm/test-code/tree/master/chaim-mybatis-plus

大致思路

要做异步操作, 就得做多线程, 但是事物是和线程是绑定在一起的,
同时我们知道, commit和rollback是和DML语句一起使用的, 也就是我们能知道这条SQL是成功还是失败
通过上面, 我们就可以进行线程等待, 在所有的DML语句执行之后, 统一进行commit还是rollback, 执行快的等待执行慢的, 当同时OK就进行统一操作
就可采用下面所列的方式, 当然还有很多别的方式也可以进行

强调:

多测, 做线程循环测试跑. 比如AB, JMeter, apifox, 下面列举的InsertBatchErrorServiceImpl在普通测试过程中不会出现问题, 但是当次数过多就会出现无法唤醒的情况. 一定要测试, 有些场景需要多跑几遍才能够进行重现

代码部分

InsertBatchSuccessServiceImpl:
该实现采用的是CountDownLatch, countDown()递减锁的数量, await()等待直到当前计数器数量为0, 释放所有等待线程

InsertBatchErrorServiceImpl:
该实现采用的 LockSupport.park()悬停、LockSupport.unpark(thread)唤醒. 但在实际使用过程中发现会出现无法唤醒的情况, 我发布了问题(可供参考 ), 但是目前还没有得到解决, 不得不暂时放弃


InsertBatchSuccessServiceImpl.java

package com.chaim.mybatis.service.impl;import com.chaim.mybatis.converter.SysUserConverter;
import com.chaim.mybatis.dto.SysUserDTO;
import com.chaim.mybatis.entitys.SysUser;
import com.chaim.mybatis.mappers.SysUserMapper;
import com.chaim.mybatis.service.InsertBatchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author Chaim* @date 2022/12/24 16:06*/
@Service
@RequiredArgsConstructor
@Slf4j
public class InsertBatchSuccessServiceImpl implements InsertBatchService {private final SysUserMapper sysUserMapper;private final SysUserConverter sysUserConverter;private final DataSourceTransactionManager dataSourceTransactionManager;@Overridepublic Object insertBatch(SysUserDTO.InsertSysUserDTO insertSysUserDTO) {// 定义开启的线程数final int i = 3;CountDownLatch latch = new CountDownLatch(i);// 事务定义DefaultTransactionDefinition definition = new DefaultTransactionDefinition();// 所有开启事务的线程中, 是否存在异常AtomicBoolean isException = new AtomicBoolean(Boolean.FALSE);List<CompletableFuture<Void>> list = new ArrayList<>();list.add(this.save(insertSysUserDTO, latch, definition, isException));list.add(this.save1(insertSysUserDTO, latch, definition, isException));list.add(this.save2(insertSysUserDTO, latch, definition, isException));// 调用Future的阻塞接口, 等待全部future实例异步执行结束CompletableFuture.allOf(list.toArray(new CompletableFuture[i])).join();return "SUCCESS";}private CompletableFuture<Void> save(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {// 获得事务状态TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(1234);sysUserMapper.insert(sysUser);this.threadBlocking(latch, status, isException);} catch (Exception exception) {log.error("方法: [save] 异常: {}", exception.getMessage());this.errorRollback(latch, status, isException);}});}private CompletableFuture<Void> save1(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(9876);sysUser.setUsername(sysUser.getUsername().concat(": 子线程1"));sysUserMapper.insert(sysUser);this.threadBlocking(latch, status, isException);} catch (Exception exception) {log.error("方法: [save1] 异常: {}", exception.getMessage());this.errorRollback(latch, status, isException);}});}private CompletableFuture<Void> save2(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(9876);sysUser.setUsername(sysUser.getUsername().concat(": 子线程2"));sysUserMapper.insert(sysUser);this.threadBlocking(latch, status, isException);} catch (Exception exception) {log.error("方法: [save2] 异常: {}", exception.getMessage());this.errorRollback(latch, status, isException);}});}/*** 进行线程阻塞操作** @param latch* @param status* @param isException*/private void threadBlocking(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException) throws InterruptedException {log.info("计数器递减");latch.countDown();log.info("开始悬停, 剩余计数数量: {}", latch.getCount());latch.await();// 以下步骤抛出异常进入catch, 做countDown操作, 不会影响. 走到这一步已经没有阻塞了if (isException.get()) {log.info("开始回滚");dataSourceTransactionManager.rollback(status);} else {log.info("开始提交");dataSourceTransactionManager.commit(status);}}/*** 程序异常, 进行回滚, 线程唤醒** @param latch* @param status* @param isException*/private void errorRollback(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException) {// 设定线程中存在异常信息isException.set(Boolean.TRUE);latch.countDown();log.info("开始回滚, 程序异常, 计数器递减, 剩余数量: {}", latch.getCount());// 本线程回滚dataSourceTransactionManager.rollback(status);}
}

InsertBatchErrorServiceImpl.java

关于该方式存在的问题, 我已经在问答区提出疑问, 可供参考

package com.chaim.mybatis.service.impl;import com.chaim.mybatis.converter.SysUserConverter;
import com.chaim.mybatis.dto.SysUserDTO;
import com.chaim.mybatis.entitys.SysUser;
import com.chaim.mybatis.mappers.SysUserMapper;
import com.chaim.mybatis.service.InsertBatchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;/*** @author Chaim* @date 2022/12/24 22:06*/
@Service
@RequiredArgsConstructor
@Slf4j
public class InsertBatchErrorServiceImpl implements InsertBatchService {private final SysUserMapper sysUserMapper;private final SysUserConverter sysUserConverter;private final DataSourceTransactionManager dataSourceTransactionManager;@Overridepublic Object insertBatch(SysUserDTO.InsertSysUserDTO insertSysUserDTO) {// 定义开启的线程数AtomicInteger totalThreadCount = new AtomicInteger(3);// 事务定义DefaultTransactionDefinition definition = new DefaultTransactionDefinition();// 所有开启事务的线程中, 是否存在异常AtomicBoolean isException = new AtomicBoolean(Boolean.FALSE);// 定义没有执行结束的线程集合List<Thread> unFinishedThread = Collections.synchronizedList(new ArrayList<>());List<CompletableFuture<Void>> list = new ArrayList<>();list.add(this.save(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));list.add(this.save1(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));list.add(this.save2(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));// 调用Future的阻塞接口, 等待全部future实例异步执行结束CompletableFuture.allOf(list.toArray(new CompletableFuture[totalThreadCount.get()])).join();return "SUCCESS";}private CompletableFuture<Void> save(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {// 获得事务状态TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(1234);sysUserMapper.insert(sysUser);this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);} catch (Exception exception) {log.error("方法: [save] 异常: {}", exception.getMessage());this.errorRollback(unFinishedThread, totalThreadCount, status, isException);}});}private CompletableFuture<Void> save1(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(9876);sysUser.setUsername(sysUser.getUsername().concat(": 子线程1"));sysUserMapper.insert(sysUser);this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);} catch (Exception exception) {log.error("方法: [save1] 异常: {}", exception.getMessage());this.errorRollback(unFinishedThread, totalThreadCount, status, isException);}});}private CompletableFuture<Void> save2(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {return CompletableFuture.runAsync(() -> {TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);try {sysUser.setPassword("123456");sysUser.setSalt(9876);sysUser.setUsername(sysUser.getUsername().concat(": 子线程2"));sysUserMapper.insert(sysUser);this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);} catch (Exception exception) {log.error("方法: [save2] 异常: {}", exception.getMessage());this.errorRollback(unFinishedThread, totalThreadCount, status, isException);}});}/*** 进行线程阻塞操作** @param thread* @param unFinishedThread* @param totalThreadCount* @param status* @param isException*/private void threadBlocking(Thread thread, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, TransactionStatus status, AtomicBoolean isException) {// 添加到没有执行结束的线程集合unFinishedThread.add(thread);// 每个线程都在悬停前开启唤醒检查this.notifyAllThread(unFinishedThread, totalThreadCount, false);if (isException.get()) {log.info("已存在异步任务发生回滚, 当前线程: {}", thread.getName());dataSourceTransactionManager.rollback(status);} else {log.info("线程: {}, 开始悬停", thread.getName());LockSupport.park();if (isException.get()) {log.info("线程: {}, 开始回滚", thread.getName());dataSourceTransactionManager.rollback(status);} else {log.info("线程: {}, 开始提交", thread.getName());dataSourceTransactionManager.commit(status);}}}/*** 程序异常, 进行回滚, 线程唤醒** @param unFinishedThread* @param totalThreadCount* @param status* @param isException*/private void errorRollback(List<Thread> unFinishedThread, AtomicInteger totalThreadCount, TransactionStatus status, AtomicBoolean isException) {// 设定线程中存在异常信息isException.set(Boolean.TRUE);// 本线程回滚dataSourceTransactionManager.rollback(status);// 发生异常, 全部线程进行唤醒this.notifyAllThread(unFinishedThread, totalThreadCount, true);log.info("异常回滚, 开始全部线程唤醒, 当前线程数量: {}", unFinishedThread.size());}/*** 唤醒全部悬停的线程** @param unFinishedThread 手动悬停的线程* @param totalThreadCount 全部开启的线程数* @param isForce          是否强行操作集合中全部线程*/private void notifyAllThread(List<Thread> unFinishedThread, AtomicInteger totalThreadCount, boolean isForce) {if (isForce || unFinishedThread.size() == totalThreadCount.get()) {for (Thread thread : unFinishedThread) {LockSupport.unpark(thread);log.info("线程: [{}]被唤醒", thread.getName());}}}
}

效果图

多线程模式下保证事物的一致性相关推荐

  1. c++多线程模式下的socket编程(线程池实现)

    socket 编程可以说是一个基本的技术掌握,而多个客户端向服务端发送请求又是一个非常常见的场景,因此多线程模式下的socket编程则显得尤为常见与重要. 本文主要利用线程池的技术,来实现多线程的模式 ...

  2. 达梦DM8单进程多线程架构模式下各线程详解

    达梦数据库进程管理方式类似于Mysql,属于单进程多线程模式.数据库服务进程包含:DmServer(主服务进程)和DmAPService(备份服务进程).线程主要包括:监听线程.IO线程.工作线程.调 ...

  3. Linux 多线程 ”一写多读” 模式下的无锁设计

    缘起 双buffer "无锁" 设计 指针的切换 ptr 竞争条件的解决 指针访问丢失 延伸 结语 缘起 在linux多线程环境下对同一变量进行读写时,经常会遇到读写的原子性问题, ...

  4. 一文讲透微服务下如何保证事务的一致性

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"获取公众号专属群聊入口 随着业务的快速发展.业务复杂度越来越高,传统单体应用逐渐 ...

  5. 咖啡汪笔记 —— 微服务架构下如何保证事务的一致性(InfoQ公开课)

    Hello, 大家好! 我是不作死就不会死,智商不在线,但颜值超有品的拆家队大队长 --咖啡汪 一只不是在戏精,就是在戏精路上的极品二哈 前几天在 InfoQ 公开课上看到了自己感兴趣的东西,所以便简 ...

  6. 【Linux 内核 内存管理】RCU 机制 ④ ( RCU 模式下更新链表项 list_replace_rcu 函数 | 链表操作时使用 smp_wmb() 函数保证代码执行顺序 )

    文章目录 一.RCU 模式下更新链表项 list_replace_rcu 函数 二.链表操作时使用 smp_wmb() 函数保证代码执行顺序 一.RCU 模式下更新链表项 list_replace_r ...

  7. 前后端分离开发模式下后端质量的保证 —— 单元测试

    概述 在今天, 前后端分离已经是首选的一个开发模式.这对于后端团队来说其实是一个好消息,减轻任务并且更专注.在测试方面,就更加依赖于单元测试对于API以及后端业务逻辑的较验.当然单元测试并非在前后端分 ...

  8. Linux TCP server系列(6)-select模式下的多线程server

    目标: 修改上一篇的select模式下的server,让它使用多线程来处理客户端请求(多进程的模式已经在上篇中加了注释). 思路: (1)服务器 我们已经在之前的客户端模型多个并发用户的过程中使用过多 ...

  9. ultraedit 运行的是试用模式_单元测试 —— 前后端分离开发模式下后端质量的保证...

    概述 在今天, 前后端分离已经是首选的一个开发模式.这对于后端团队来说其实是一个好消息,减轻任务并且更专注.在测试方面,就更加依赖于单元测试对于API以及后端业务逻辑的较验.当然单元测试并非在前后端分 ...

最新文章

  1. css解决div子元素margin溢出的问题
  2. AMDKFD 合并入 Linux 3.19 内核
  3. SAP HR模块的基础数据表和增强配置
  4. 銷售訂單 (Sales Order): 資料表及更新記錄
  5. html5/css3响应式布局介绍及设计流程
  6. 如何分析SAP CRM UI label显示成technical name的问题
  7. 蓝桥杯 ADV-103 算法提高 逆序排列
  8. VC Ping IP的类
  9. 一个JAVA小虾米初入江湖
  10. 删除链表的节点(JS)
  11. 1005打印任务取消不了 hp_Windows10+HP M176N奇怪的发送打印命令不执行任务就消失的问题...
  12. MagicalCoder可视化开发平台:轻松搭建业务系统,为企业创造更多价值
  13. python中-是什么意思
  14. 华夏银行签约金融壹账通 借助金融科技转型升级
  15. Project2016创建WBS并且进行相关设置
  16. 电子词典(tcp多进程模型)
  17. 【干货】阿里资深无线技术专家孙兵谈闲鱼社区技术架构演进
  18. 《那些年啊,那些事——一个程序员的奋斗史》——91
  19. 解决方案:某些网页Firefox不能记住密码
  20. mysql中in的用法详解

热门文章

  1. i908/i908E手机如何拨打固定电话分机号
  2. opencv 图像金字塔及图像重建、融合
  3. python dis模块解析
  4. 关于单级PID及串级PID
  5. win7 黑屏之感叹
  6. arm编译安装php启动,在 ARM 设备上手工编译 LNMP 编译到心态爆炸
  7. 小米盒子运行linux,小米盒子刷机成砖的解救措施攻略详解
  8. 2019最新Java实战开发今日头条资讯网站
  9. 手机dpi修改工具_【原创教程】修改分辨率和dpi 让安卓手机不再卡顿
  10. 图像锐化(增强)和边缘检测