背景:物联网平台背景,传感器采集频率干到了1000Hz,分了100多张表出来,还是把mysql干炸了。当前单表数据量在1000来w,从kafka上拉数据异步批量插入,每次插入数据量1500条,测试的时候还没问题,结果上线没多久,kafka服务器直接挂了,赶忙看日志,kafka服务器堆积了几十G的数据,再去看生产环境日志,发现到最后单次批量插入用时固定在10多秒,甚至20多秒,kafka直接把消费端踢出了消费组…从而kafka消息一直没有消费,总重导致kafka数据堆积挂掉了…

在这样的情况下:采取的处理方案无非就分库分表,减少单表数据量,降低数据库压力;提高批量插入效率,提高消费者消费速度。
本文主要把精力放在如何提高批量插入效率上。

使用的mybatisplus的批量插入方法:saveBatch(),之前就看到过网上都在说在jdbc的url路径上加上
rewriteBatchedStatements=true 参数mysql底层才能开启真正的批量插入模式。

保证5.1.13以上版本的驱动,才能实现高性能的批量插入。 MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。 另外这个选项对INSERT/UPDATE/DELETE都有效。

可是我之前已经添加了,而且数据表目前是没有建立索引的,即使是在1000来w的数据量下进行1500条的批量插入也不可能消耗20来秒吧,于是矛盾转移到saveBatch方法,使用版本:V3.4.3.4
查看源码:

   public boolean saveBatch(Collection<T> entityList, int batchSize) {String sqlStatement = this.getSqlStatement(SqlMethod.INSERT_ONE);return this.executeBatch(entityList, batchSize, (sqlSession, entity) -> {sqlSession.insert(sqlStatement, entity);});}
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {return SqlHelper.executeBatch(this.entityClass, this.log, list, batchSize, consumer);}
    public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {Assert.isFalse(batchSize < 1, "batchSize must not be less than one", new Object[0]);return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, (sqlSession) -> {int size = list.size();int i = 1;for(Iterator var6 = list.iterator(); var6.hasNext(); ++i) {E element = var6.next();consumer.accept(sqlSession, element);if (i % batchSize == 0 || i == size) {sqlSession.flushStatements();}}});}

最终来到了executeBatch()方法,可以看到这很明显是在一条一条循环插入,通过sqlSession.flushStatements()将一个个单条插入的insert语句分批次进行提交,而且是同一个sqlSession,这相比遍历集合循环insert来说有一定的性能提升,但是这并不是sql层面真正的批量插入。

通过查阅相关文档后,发现mybatisPlus提供了sql注入器,我们可以自定义方法来满足业务的实际开发需求。
sql注入器官网
sql注入器官方示例
在mybtisPlus的核心包下提供的默认可注入方法有这些:

在扩展包下,mybatisPlus还为我们提供了可扩展的可注入方法:

AlwaysUpdateSomeColumnById: 根据Id更新每一个字段,全量更新不忽略null字段,解决mybatis-plus中updateById默认会自动忽略实体中null值字段不去更新的问题;
InsertBatchSomeColumn: 真实批量插入,通过单SQL的insert语句实现批量插入;
Upsert: 更新or插入,根据唯一约束判断是执行更新还是删除,相当于提供insert on duplicate key update支持。

可以发现mybatisPlus已经提供好了InsertBatchSomeColumn的方法,我们只需要把这个方法添加进我们的sql注入器即可。

    public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {KeyGenerator keyGenerator = NoKeyGenerator.INSTANCE;SqlMethod sqlMethod = SqlMethod.INSERT_ONE;List<TableFieldInfo> fieldList = tableInfo.getFieldList();String insertSqlColumn = tableInfo.getKeyInsertSqlColumn(true, false) + this.filterTableFieldInfo(fieldList, this.predicate, TableFieldInfo::getInsertSqlColumn, "");//------------------------------------拼接批量插入语句----------------------------------------String columnScript = "(" + insertSqlColumn.substring(0, insertSqlColumn.length() - 1) + ")";String insertSqlProperty = tableInfo.getKeyInsertSqlProperty(true, "et.", false) + this.filterTableFieldInfo(fieldList, this.predicate, (i) -> {return i.getInsertSqlProperty("et.");}, "");insertSqlProperty = "(" + insertSqlProperty.substring(0, insertSqlProperty.length() - 1) + ")";String valuesScript = SqlScriptUtils.convertForeach(insertSqlProperty, "list", (String)null, "et", ",");//------------------------------------------------------------------------------------------String keyProperty = null;String keyColumn = null;if (tableInfo.havePK()) {if (tableInfo.getIdType() == IdType.AUTO) {keyGenerator = Jdbc3KeyGenerator.INSTANCE;keyProperty = tableInfo.getKeyProperty();keyColumn = tableInfo.getKeyColumn();} else if (null != tableInfo.getKeySequence()) {keyGenerator = TableInfoHelper.genKeyGenerator(this.getMethod(sqlMethod), tableInfo, this.builderAssistant);keyProperty = tableInfo.getKeyProperty();keyColumn = tableInfo.getKeyColumn();}}String sql = String.format(sqlMethod.getSql(), tableInfo.getTableName(), columnScript, valuesScript);SqlSource sqlSource = this.languageDriver.createSqlSource(this.configuration, sql, modelClass);return this.addInsertMappedStatement(mapperClass, modelClass, this.getMethod(sqlMethod), sqlSource, (KeyGenerator)keyGenerator, keyProperty, keyColumn);}

接下来就通过SQL注入器实现真正的批量插入

默认的sql注入器

public class DefaultSqlInjector extends AbstractSqlInjector {public DefaultSqlInjector() {}public List<AbstractMethod> getMethodList(Class<?> mapperClass, TableInfo tableInfo) {if (tableInfo.havePK()) {return (List)Stream.of(new Insert(), new Delete(), new DeleteByMap(), new DeleteById(), new DeleteBatchByIds(), new Update(), new UpdateById(), new SelectById(), new SelectBatchByIds(), new SelectByMap(), new SelectCount(), new SelectMaps(), new SelectMapsPage(), new SelectObjs(), new SelectList(), new SelectPage()).collect(Collectors.toList());} else {this.logger.warn(String.format("%s ,Not found @TableId annotation, Cannot use Mybatis-Plus 'xxById' Method.", tableInfo.getEntityType()));return (List)Stream.of(new Insert(), new Delete(), new DeleteByMap(), new Update(), new SelectByMap(), new SelectCount(), new SelectMaps(), new SelectMapsPage(), new SelectObjs(), new SelectList(), new SelectPage()).collect(Collectors.toList());}}
}

继承DefaultSqlInjector自定义sql注入器

/*** @author zhmsky* @date 2022/8/15 15:13*/
public class MySqlInjector extends DefaultSqlInjector {@Overridepublic List<AbstractMethod> getMethodList(Class<?> mapperClass) {List<AbstractMethod> methodList = super.getMethodList(mapperClass);//更新时自动填充的字段,不用插入值methodList.add(new InsertBatchSomeColumn(i -> i.getFieldFill() != FieldFill.UPDATE));return methodList;}
}

将自定义的sql注入器注入到Mybatis容器中

/*** @author zhmsky* @date 2022/8/15 15:15*/
@Configuration
public class MybatisPlusConfig {@Beanpublic MySqlInjector sqlInjector() {return new MySqlInjector();}
}

继承 BaseMapper 添加自定义方法

/*** @author zhmsky* @date 2022/8/15 15:17*/
public interface CommonMapper<T> extends BaseMapper<T> {/*** 真正的批量插入* @param entityList* @return*/int insertBatchSomeColumn(List<T> entityList);
}

对应的mapper层接口继承上面自定义的mapper

 * @author zhmsky* @since 2021-12-01*/
@Mapper
public interface UserMapper extends CommonMapper<User> {}

最后直接调用UserMapper的insertBatchSomeColumn()方法即可实现真正的批量插入。

    @Testvoid contextLoads() {for (int i = 0; i < 5; i++) {User user = new User();user.setAge(10);user.setUsername("zhmsky");user.setEmail("21575559@qq.com");userList.add(user);}long l = System.currentTimeMillis();userMapper.insertBatchSomeColumn(userList);long l1 = System.currentTimeMillis();System.out.println("-------------------:"+(l1-l));userList.clear();}

查看日志输出信息,观察执行的sql语句,

发现这才是真正意义上的sql层面的批量插入。
但是,到这里并没有结束,mybatisPlus官方提供的insertBatchSomeColumn方法不支持分批插入,也就是有多少直接全部一次性插入,这就可能会导致最后的sql拼接语句特别长,超出了mysql的限制,于是我们还要实现一个类似于saveBatch的分批的批量插入方法。

添加分批插入

模仿原来的saveBatch方法:

 * @author zhmsky* @since 2021-12-01*/
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {@Override@Transactional(rollbackFor = {Exception.class})public boolean saveBatch(Collection<User> entityList, int batchSize) {try {int size = entityList.size();int idxLimit = Math.min(batchSize, size);int i = 1;//保存单批提交的数据集合List<User> oneBatchList = new ArrayList<>();for (Iterator<User> var7 = entityList.iterator(); var7.hasNext(); ++i) {User element = var7.next();oneBatchList.add(element);if (i == idxLimit) {baseMapper.insertBatchSomeColumn(oneBatchList);//每次提交后需要清空集合数据oneBatchList.clear();idxLimit = Math.min(idxLimit + batchSize, size);}}} catch (Exception e) {log.error("saveBatch fail", e);return false;}return true;}
}

测试:

    @Testvoid contextLoads() {for (int i = 0; i < 20; i++) {User user = new User();user.setAge(10);user.setUsername("zhmsky");user.setEmail("21575559@qq.com");userList.add(user);}long l = System.currentTimeMillis();userService.saveBatch(userList,10);long l1 = System.currentTimeMillis();System.out.println("-------------------:"+(l1-l));userList.clear();}

输出结果:

分批插入已满足,到此收工结束了。

接下来最重要的测试下性能


当前数据表的数据量在100w多条,在此基础上分别拿原始的saveBatch(假的批量插入)和 insertBatchSomeColumn(真正的批量插入)进行性能对比----(jdbc均开启rewriteBatchedStatements):

原来的假的批量插入:

  @Testvoid insert(){for (int i = 0; i < 50000; i++) {User user = new User();user.setAge(10);user.setUsername("zhmsky");user.setEmail("21575559@qq.com");userList.add(user);}long l = System.currentTimeMillis();userService.saveBatch(userList,1000);long l1 = System.currentTimeMillis();System.out.println("原来的saveBatch方法耗时:"+(l1-l));}


自定义的insertBatchSomeColumn:

    @Testvoid contextLoads() {for (int i = 0; i < 50000; i++) {User user = new User();user.setAge(10);user.setUsername("zhmsky");user.setEmail("21575559@qq.com");userList.add(user);}long l = System.currentTimeMillis();userService.saveBatch(userList,1000);long l1 = System.currentTimeMillis();System.out.println("自定义的insertBatchSomeColumn方法耗时:"+(l1-l));userList.clear();}


分批插入5w条数据,自定义的真正意义上的批量插入耗时减少了3秒左右,用insertBatchSomeColum分批插入1500条数据耗时650毫秒,这速度已经挺快了

mybatisPlus批量插入性能优化相关推荐

  1. mysql批量插入性能优化:executeBatch如何通过rewriteBatchedStatements参数逆袭

    文章目录 前言 一.实战演示 1.单元测试 2.不添加rewriteBatchedStatements参数 3.添加rewriteBatchedStatements参数 4.采用InsertBatch ...

  2. c mysql批量插入优化_MySQL实现批量插入以优化性能的教程

    这篇文章主要介绍了MySQL实现批量插入以优化性能的教程,文中给出了运行时间来表示性能优化后的对比,需要的朋友可以参考下 对于一些数据量较大的系统,数据库面临的问题除了查询效率低下,还有就是数据入库时 ...

  3. mysql如何优化性能优化_如何优化性能?MySQL实现批量插入以优化性能的实例详解...

    这篇文章主要介绍了MySQL实现批量插入以优化性能的教程,文中给出了运行时间来表示性能优化后的对比,需要的朋友可以参考下 对于一些数据量较大的系统,数据库面临的问题除了查询效率低下,还有就是数据入库时 ...

  4. mybatis-plus批量插入数据

    mybatis-plus批量插入数据 saveBatch 这样会非常慢? InsertBatchSomeColumn(批量插入 仅适用于mysql,一次插入多条数据) ruoyi中对BaseMappe ...

  5. 使用mybatis-plus批量插入遇到的两个问题记录

    最近应用系统适配时,使用mybatis plus遇到的两个问题记录. 环境说明 Mybatisplus:3.1.1 DM数据库:DM V8 03134283890-20220518-160920-10 ...

  6. mybatis-plus 批量插入效率低的问题【重写sql注入 SqlInjector】

    背景 由于项目中需要大批量将数据插入数据库,直接使用mybatis-plus中的批量插入方法,结果发现效率奇低无比,线上批量插入一千条数据居然花销八九秒的时间.而我们的目标是想要单次插入一万条数据,这 ...

  7. DB2批量插入性能对比

    DB2批量插入性能对比 import ibm_db import random import time first_names = '赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏' \ ...

  8. mybatis以及mybatisplus批量插入问题

    1. 思路分析: 批量插入是我们日常开放经常会使用到的场景,一般情况下我们也会有两种方案进行实施,如下所示. 方案一 就是用 for 循环循环插入: 优点:JDBC 中的 PreparedStatem ...

  9. mybatisPlus批量插入优化,性能快的飞起

    文章目录 1.背景 2.方案 2.1 多线程分页查询 . 生产者消费者模型.多线程sql注入器批量插入 2.2 游标查询sql注入器批量插入 2.3 多线程分页查询 . 生产者消费者模型.多线程往ES ...

最新文章

  1. 昨日我是谁,今日谁是我
  2. 统一沟通-技巧-13-Lync-Polycom RMX 1500-配置
  3. 4.22、Bootstrap V4自学之路-----内容---轮播
  4. 签名算法sha256withrsa,RSA数字证书公钥私钥生成,base64转码和文件日志
  5. 【洛谷1361】 小M的作物(最小割)
  6. 苏州科技大学计算机学院 李双娴,苏州大学计算机科学与技术学院第十二次研究生代表大会顺利召开...
  7. 【canvas系列】canvas实现“ 简单的Amaziograph效果”--画对称图【强迫症福利】
  8. 【SQL*PLUS】Copy Command
  9. 【Animation】 使用handler和Runnable实现某一个控件的抖动效果
  10. 红橙Darren视频笔记 类加载机制(API28) 自己写个热修复 查看源码网站
  11. ndk-build生成.so
  12. Android studio 配置file encoding 无效,中文乱码解决办法
  13. android11系统原生铃声,原生系统的凤毛麟角 索尼Xperia 1 II推送Android 11体验
  14. 手把手教你做线性回归分析,实用且通俗易懂!
  15. matlab画colormap
  16. 向爷爷介绍计算机,他折腾爷爷的计算机
  17. 妖怪,看法宝-看反射的“照妖镜”如何让类原形毕露
  18. iPhone4 SIM失败?无效SIM?有效解决
  19. face.evoLVe 人脸比对测试
  20. 软件工程是不是教不怎么会的写程序的人开发软件?

热门文章

  1. mysql求女生人数_mySql数据库基础
  2. 修改设置显示为平板模式
  3. Android 逆向新方式:FakerAndroid
  4. python的岗位多吗-为什么python的校招岗位这么少,校招python好找工作么?
  5. 三八妇女节女神节知识答题女生知识大PK活动
  6. python object 类
  7. WIN10-22H2专业版_电脑维修人员专用装机系统镜像【04.20更新】
  8. 计算机配色与人工配色原则,计算机调色与人工调色如何选择?
  9. R语言线性回归模型拟合诊断异常值分析家庭燃气消耗量和卡路里实例带自测题
  10. 干部人事档案综合管理系统软件