多线程中的事务回滚,你真的用对了吗?
点击关注公众号,实用技术文章及时了解
来源:blog.csdn.net/weixin_43225491/article/
details/117705686
背景介绍
1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。
2,在spring中可以使用@Transactional
注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
3,下面用一个简单示例演示多线程事务。
公用的类和方法
/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/** 线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
}
示例事务不成功操作
/*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list = lists.get(i);threadArray[i] = new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
数据库中存在一条数据:
//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/*** 测试多线程事务.* @throws InterruptedException*/@Testpublic void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}
测试结果:
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional
注解没有生效。
使用sqlSession
控制手动提交事务
@ResourceSqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>
数据库中一条数据:
测试结果:抛出异常,
删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
成功操作示例:
@Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}
测试结果:
数据库中数据:
删除的删除了,添加的添加成功了,测试成功。
推荐
主流Java进阶技术(学习资料分享)
Java面试题宝典
加入Spring技术开发社区
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
多线程中的事务回滚,你真的用对了吗?相关推荐
- Spring中的事务回滚 网上比较不错的文章
1 浅谈Spring中的事务回滚 https://www.cnblogs.com/zeng1994/p/8257763.html 2 spring 事务回滚 https://www.cnblogs.c ...
- Spring声明式事务管理中的事务回滚
一:使用 本文在spring + spring mvc + mybatis中使用 第一步配置xml:注意xml最前面tx名称空间一定要配置 <beans xmlns="http://w ...
- java中的事务回滚_Spring中的事务回滚机制
问题:在Java项目汇中,添加@Transactional注解,报错之后,事务回滚未生效,数据仍插入数据库中.经查看报错位置位于新增成功之后.空指针异常. 一.特性 先了解一下@Transaction ...
- 多线程如何实现事务回滚?一招帮你解决
特别说明CountDownLatch **CountDownLatch是一个类springboot自带的类,可以直接用,**变量AtomicBoolean 也是可以直接使用 CountDownLatc ...
- MySQL中的事务回滚机制
在 MySQL 中,恢复机制是通过回滚日志(undo log)实现的,所有事务进行的修改都会先记录到这个回滚日志中,然后在对数据库中的对应行进行写入. 当事务已经被提交之后,就无法再次回滚了. 回滚日 ...
- Spring中@Transactional事务回滚(含实例详细讲解,附源码)
一.使用场景举例 在了解@Transactional怎么用之前我们必须要先知道@Transactional有什么用.下面举个栗子:比如一个部门里面有很多成员,这两者分别保存在部门表和成员表里面,在删除 ...
- Mysql存储过程中的事务回滚
create procedure test(in a int)BEGINDECLARE t_error INTEGER DEFAULT 0;DECLARE CONTINUE HANDLER FOR S ...
- 一套超好用的“Excel导入导出+多线程处理导入数据+多线程事务回滚”的模板方法
一.模板流程: 二.功能演示: 1.Excel数据: 数据说明:第一条数据完整,可以成功导入:第二条数据无姓名,业务逻辑姓名不允许为空,会导出到错误Excel中:第三条数据无姓名无类型,业务逻辑姓名类 ...
- 实际开发中,有时没有异常发生,但是执行结果不是我们期望的情况,需要手动让事务回滚
需求:开支单保存 原来的代码: 修改后的代码: Spring控制事务下手动回滚事务的方法: 在实际开发中,有时并没有异常发生,但是由于事务结果未满足具体业务需求,所以我们不得不手动回滚事务! 有如下两 ...
最新文章
- linux 自动备份脚本
- leetcode-C语言代码练习
- 性能测试(01)-jmeter元件-线程组、调试取样器
- 【论文阅读】A Gentle Introduction to Graph Neural Networks [图神经网络入门](4)
- android studio初始化设置,Android studio 初始设置
- 【飞控理论】【惯性导航基础】什么是欧拉角?为什么会有欧拉角?欧拉角在航空领域的运用?
- 魅族智能识屏怎么用才算是高科技?
- 验证空间变形:电子在测地线的圆形轨道上辐射行为
- 设置Chrome为兼容模式
- Android 学习博客
- Centos7.5 BCM4322无线网卡驱动安装踩坑记录
- 你应该会喜欢的5个自定义 Hook
- 微信小程序搜索,搜索历史,清除搜索历史,以及点击搜索历史实现搜索功能
- C语言程序设计博客作业07
- Windows PC连接苹果LG UltraFine 4K显示器教程
- Python基础之告警定义与告警抑制
- 微软WHQL认证有哪些步骤?驱动程序签名及发布
- 大连理工大学概率与统计上机作业
- 什么是自己的商业模式?
- 解决VBS使用记事本编译,执行时,中文汉字乱码的问题
热门文章
- 苹果A13打得过麒麟990吗?看完秒懂
- 华为1999元起的智能眼镜,能通话能播放音乐,预售就抢疯了!
- 贾跃亭又成功拿到6亿融资!九城与法拉第未来签约...
- 苹果2019新款iPhone售价惊曝:咬牙仍坚持高价位?
- 不写一行代码,基于Jmeter打造性能测试数据平台
- 最好的休息,不是睡觉
- 微软rt做打印服务器,转换打印监视器以便与群集打印服务器配合使用
- 协议圣经 五 rtsp client
- 关于socket组播和ssdp(一)[修改1.2]
- ffmpeg 2.6.3在Linux下的编译