大概所有的程序员应该都接触过批量插入的场景,我也相信任何的程序员都能写出可正常运行的批量插入的代码。但怎样实现一个高效、快速插入的批量插入功能呢?

由于每个人的工作履历,工作年限的不同,在实现这样的一个需求时,可能技术选型各有不同,有直接生成insert语句的,有用EF的或者其他的orm框架的。其实不管是手写insert还是使用EF,最终交给数据库执行的还是insert语句。下面是EF批量插入的示例代码:

var list = new List<Student>();for (int i = 0; i < 100; i++){list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });}await _context.Students.AddRangeAsync(list);await _context.SaveChangesAsync();

生成的脚本截图如下:

这种实现方式在数据量100以内时,耗时还算可以。但如果要批量导入的数据达到万级的时候,那耗时简直是灾难。我测试的数据如下(测试数据库为mysql,具体配置不详):

数据量 耗时(s)
10 0.028
1w 3.929
10w 31.280

10w的数据已经耗时超过了30s,我没有勇气测试100w数据的耗时,有兴趣的可以自行测试下。

下面就应该进入正题了,对于较大数据量(1000以上)场景下的批量插入,各个数据库应该都提供了相关的解决方案,由于工作所限,目前笔者仅接触过mysql和mssql。

mysql的实现方案是LOAD DATA命令,此命令接收一个csv文件,然后将文件上传到数据库服务器后,解析数据后插入。好在MySqlConnector提供了相关的封装,不用咱们去熟悉那么复杂的命令参数。

mssql实现的方案是使用SqlBulkCopy类,不过此类仅接收DataTable类型的数据,所以,在批量插入的时候,需要将数据源转换成DataTable。

综上所示,不管是mysql,还是mssql,均需要将数据源转换成指定的格式才可以使用批量导入的功能,所以这一块的主要核心就是转换数据源格式。mysql需要转换成csv,mssql需要转换成DataTable。下面就来一起看看具体的转换的方法。

以下代码是转换csv和DataTable相关方法:

namespace FL.DbBulk{public static class Extension{/// <summary>/// 获取实体影射的表名/// </summary>/// <param name="type"></param>/// <returns></returns>public static string GetMappingName(this System.Type type){var key = $"batch{type.FullName}";var tableName = CacheService.Get(key);if (string.IsNullOrEmpty(tableName)){var tableAttr = type.GetCustomAttribute<TableAttribute>();if (tableAttr != null){tableName = tableAttr.Name;}else{tableName = type.Name;}CacheService.Add(key, tableName);}return tableName;}public static List<EntityInfo> GetMappingProperties(this System.Type type){var key = $"ICH.King.DbBulk{type.Name}";var list = CacheService.Get<List<EntityInfo>>(key);if (list == null){list = new List<EntityInfo>();foreach (var propertyInfo in type.GetProperties()){if (!propertyInfo.PropertyType.IsValueType &&propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;var temp = new EntityInfo();temp.PropertyInfo = propertyInfo;temp.FieldName = propertyInfo.Name;var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();if (attr != null){temp.FieldName = attr.Name;}temp.GetMethod = propertyInfo.CreateGetter();list.Add(temp);}CacheService.Add(key, list);}return list;}/// <summary>/// 创建cvs字符串/// </summary>/// <typeparam name="T"></typeparam>/// <param name="entities"></param>/// <param name="primaryKey"></param>/// <returns></returns>public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = ""){var sb = new StringBuilder();var properties = typeof(T).GetMappingProperties().ToArray();foreach (var entity in entities){for (int i = 0; i < properties.Length; i++){var ele = properties[i];if (i != 0) sb.Append(",");var value = ele.Get(entity);if (ele.PropertyInfo.PropertyType.Name == "Nullable`1"){if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime)){if (value == null){sb.Append("NULL");}else{sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));}continue;}}if (ele.PropertyInfo.PropertyType == typeof(DateTime)){sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));continue;}//如果是主键&&string类型,且值不为空if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string)){sb.Append(Guid.NewGuid().ToString());continue;}if (value == null){continue;}if (ele.PropertyInfo.PropertyType == typeof(string)){var vStr = value.ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(value);}sb.Append(IsWin() ? "\r\n" : "\n");//sb.AppendLine();}return sb.ToString();}public static bool IsWin(){return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);}public static string CreateCsv(this DataTable table){StringBuilder sb = new StringBuilder();DataColumn colum;foreach (DataRow row in table.Rows){for (int i = 0; i < table.Columns.Count; i++){colum = table.Columns[i];if (i != 0) sb.Append(",");if (colum.DataType == typeof(string)){var vStr = row[colum].ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(row[colum]);}sb.Append(IsWin() ? "\r\n" : "\n");}return sb.ToString();}public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = ""){var type = typeof(T);//获取实体映射的表名var mappingName = type.GetMappingName();var dt = new DataTable(mappingName);//获取实体映射的属性列表var columns = type.GetMappingProperties();dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());foreach (var data in list){var row = dt.NewRow();foreach (var entityInfo in columns){var value = entityInfo.Get(data);if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string)){row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();}else{row[entityInfo.FieldName] = value;}}dt.Rows.Add(row);}return dt;}}}

转换成DataTable方法相对简单,但这里我做了个优化下,当判断主键是string类型,且值为空时,会自动生成一个GUID,并给其赋值,这样做的目的是为了和EF原生的插入功能兼容。

生成Csv的相对比较麻烦,因为Csv是用逗号以及其他符号来区分每一行、每一列数据,但经常会存在要插入的数据包含了csv的特殊符号,这样情况下就需要做转义。另外,还有一个需要考虑的问题,linux和windows默认的换行符是有区别的,windows的换行符为\r\n,而linux默认的是\n,所以在生成csv时,需要根据不同的系统进行处理。

下面来看下具体怎么调用相关的插入方法,首先看下mysql的,主要代码如下所示:

private async Task InsertCsvAsync(string csv, string tableName, List<string> columns){var fileName = Path.GetTempFileName();await File.WriteAllTextAsync(fileName, csv);var conn = _context.Database.GetDbConnection() as MySqlConnection;var loader = new MySqlBulkLoader(conn){FileName = fileName,Local = true,LineTerminator = Extension.IsWin() ? "\r\n" : "\n",FieldTerminator = ",",TableName = tableName,FieldQuotationCharacter = '"',EscapeCharacter = '"',CharacterSet = "UTF8"};loader.Columns.AddRange(columns);await loader.LoadAsync();}

在上述的代码中,首先创建一个临时文件,然后将其他数据源转换的csv内容写入到文件中,获取数据库连接,再然后创建MySqlBulkLoader类的实例,将相关参数进行复制后,还需要配置字段列表,最后执行LoadAsync命令。

下面是mssql的批量插入的核心代码:

public async Task InsertAsync(DataTable table){if (table == null){throw new ArgumentNullException();}if (string.IsNullOrEmpty(table.TableName)){throw new ArgumentNullException("DataTable的TableName属性不能为空");}var conn = (SqlConnection)_context.Database.GetDbConnection();await conn.OpenAsync();using (var bulk = new SqlBulkCopy(conn)){bulk.DestinationTableName = table.TableName;foreach (DataColumn column in table.Columns){bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);}await bulk.WriteToServerAsync(table);}}

以上方法相对简单,在此不做更多解释。

至此,mysql和mssql批量的导入的方案已经介绍完毕,但可能就会有人说了,这跟EF好像也没什么关系呀。
其实如果你有仔细看的话,或许能发现,我在代码中使用了一个名为_context字段,此字段其实就是EF的DbContext的实例。但文章内容到此时也没有完全的和EF结合,下面就来介绍下如何更优雅的将此功能集成到EF中。

在.net core中,接入EF的时候其实已经指定了使用的数据库类型,实例代码如下:

services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))

既然以及指定了数据库类型,那么在调用批量插入的时候,应该就不需要让调用者判断是使用mysql的方法,还是mssql的方法。具体怎么设计呢?且耐心往下看。

首先分别定义接口ISqlBulk,IMysqlBulk,ISqlServerBulk代码如下:

namespace FL.DbBulk{public interface ISqlBulk{/// <summary>/// 批量导入数据/// </summary>/// <param name="table">数据源</param>void Insert(DataTable table);/// <summary>/// 批量导入数据/// </summary>/// <param name="table">数据源</param>Task InsertAsync(DataTable table);void Insert<T>(IEnumerable<T> enumerable) where T : class;Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;}}

IMysqlBulk,ISqlServerBulk接口继承ISqlBulk,代码如下:

namespace FL.DbBulk{public interface IMysqlBulk : ISqlBulk{Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;}}
namespace FL.DbBulk{public interface ISqlServerBulk:ISqlBulk{}}

然后创建ISqlBulk实现类:

namespace FL.DbBulk{public class SqlBulk : ISqlBulk{private ISqlBulk _bulk;public SqlBulk(DbContext context, IServiceProvider provider){if (context.Database.IsMySql()){_bulk = provider.GetService<IMysqlBulk>();}else if (context.Database.IsSqlServer()){_bulk = provider.GetService<ISqlServerBulk>();}}public void Insert(DataTable table){_bulk.Insert(table);}public async Task InsertAsync(DataTable table){await _bulk.InsertAsync(table);}public void Insert<T>(IEnumerable<T> enumerable) where T : class{_bulk.Insert(enumerable);}public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class{await _bulk.InsertAsync(enumerable);}}}

在SqlBulk的构造函数中,通过context.Database的扩展方法判断数据库的类型,然后再获取相应的接口的实例。再然后就是实现IMysqlBulk和ISqlServerBulk的实现类。上文已经把核心代码贴出,再此为了篇幅,就不贴完整代码了。

再然后,就是提供一个注入services的方法,代码如下:

namespace Microsoft.Extensions.DependencyInjection{public static class ServiceCollectionExtension{public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where  T:DbContext{services.TryAddScoped<IMysqlBulk, MysqlBulk>();services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();services.TryAddScoped<ISqlBulk, SqlBulk>();services.AddScoped<DbContext, T>();return services;}}}

有了以上代码,我们就可以通过在Startup中很方便的启用批量插入的功能了。

最后,贴出两种插入方式对比的测试数据:

数据量 EF默认耗时(s) ISqlBulk耗时(s)
10 0.028 0.030
1w 3.929 1.581
10w 31.280 15.408

以上测试数据均是使用同一个mysql数据库,不同配置以及网络环境下,测试的数据会有差异,有兴趣的可以自己试试。

至此,本人内容已完毕。


最后,贴出git地址,如果思路或代码可以帮到你,欢迎点赞,点star
https://github.com/fuluteam/FL.DbBulk.git

EF批量插入太慢?那是你的姿势不对相关推荐

  1. 阿里云证书 | 套路太深,还是我打开姿势不对?

    文章所有链接,请点击文章左下角"阅读原文"查阅. 一.阿里云证书资源包初体验 2021 年 1 月 13 左右,收到阿里云提示个人 SSL 证书即将过期,需要续费.于是登陆阿里云, ...

  2. oracle insert汉字出错,Oracle数据库之Oracle批量插入数据SQL语句太长出错:无效的主机/绑定变量名...

    本文主要向大家介绍了Oracle数据库之Oracle批量插入数据SQL语句太长出错:无效的主机/绑定变量名,通过具体的内容向大家展现,希望对大家学习Oracle数据库有所帮助. Oracle数据库,用 ...

  3. ef oracle 批量更新慢_详解Oracle中多表关联批量插入、批量更新与批量删除

    概述 今天主要介绍一下Oracle数据库中多表关联批量插入.多表关联批量更新和多表关联批量删除.下面用实验来理解下~ 一.创建必须的表和序列语句 --创建部门表 dept:CREATE TABLE d ...

  4. Mybatis-Plus批量插入数据太慢,使用rewriteBatchedStatements属性优化,堪称速度与激情!

    rewriteBatchedStatements神秘属性 前言 一.rewriteBatchedStatements参数 二.批量添加员工信息 1.普通saveBatch批量插入 2.设置rewrit ...

  5. Entity Framework 批量插入

    为什么80%的码农都做不了架构师?>>>    奋斗的小鸟--dogxuefeng Entity Framework 批量插入很慢 Entity Framework 批量插入很慢吗? ...

  6. 大数据写入到Oracle数据库(批量插入数据)

    开发中经常遇到批量插入数据的需求,为了提高开发效率大多会使用ORM架构,个别之处 才会手写SQL,我们使用C#.NET Core5.0开发,所以优先选择了微软的EF. 但是EF原生没有批量操作功能,需 ...

  7. Java豆瓣电影爬虫——减少与数据库交互实现批量插入

    节前一个误操作把mysql中record表和movie表都清空了,显然我是没有做什么mysql备份的.所以,索性我把所有的表数据都清空的,一夜回到解放前-- 项目地址:https://github.c ...

  8. c mysql批量插入优化_MySQL实现批量插入以优化性能的教程

    这篇文章主要介绍了MySQL实现批量插入以优化性能的教程,文中给出了运行时间来表示性能优化后的对比,需要的朋友可以参考下 对于一些数据量较大的系统,数据库面临的问题除了查询效率低下,还有就是数据入库时 ...

  9. laravel批量插入报错:1292: Incorrect datetime value: '0000-00-00 00:00:00' for column 'TERM_DATE'

    一.背景 在使用laravel的批量插入的时候,数据库报错,如标题所示.这就有点奇怪了,因为之前也是同样的表结构,但是并没有报错. 1.报错信息 2.关于laravel的批量插入可参考该博客 二.解决 ...

最新文章

  1. 迷茫与飞跃:9月开始,明确了研究方向,功力提升明显,成绩比较显著
  2. condition框架设计与实现
  3. Vue+Openlayers+HIKVSION实现点击摄像头进行预览
  4. linux dns语法检测工具,DNS解析检查工具之nslookup
  5. java 多行 n_Java实现向Word添加多行图片水印
  6. asp.net treeView绑定
  7. mingw w64 v8.0.0_MinGW+OpenGL
  8. 你需要知道的基础算法知识——STL和基础数据结构(四)
  9. win10玩cf不能全屏_游戏莫名卡顿三招搞定!Win10游戏优化教程
  10. python中iskeydown什么函数_isKeyDown不能在Java中工作
  11. Exceptions, Catch, and Throw(Chapter 10 of Programming Ruby)
  12. Qt学习之路(37): Qt容器类之关联存储容器
  13. Android自动播放下一曲,环信Android自动播放下一条语音
  14. android确定kernel使用的config文件
  15. Django基础(29): select_related和prefetch_related的用法与区别
  16. 没有钱到底要不要创业?
  17. RGB, YUV及相关标准
  18. 高速充电手机电池问世 充满电只需10秒
  19. PN532半加密、无漏洞卡解密
  20. 【监听器篇】4.统计当前在线的用户人数

热门文章

  1. bzoj3122 [Sdoi2013]随机数生成器(bsgs+扩欧+数列)
  2. URAL 1682 Crazy Professor (并查集)
  3. Ubuntu16.04换源
  4. [Javascript] Avoid Creating floats if they are not needed
  5. 设计模式(10)-----模板方法模式
  6. 扒开系统调用的三层皮(下)
  7. Struts学习笔记_i18n
  8. C# 使用int.TryParse,Convert.ToInt32,(int)将浮点类型转换整数时的区别
  9. 在应用程序中实现对NandFlash的操作
  10. 大学生计算机课程考试试题,大学生计算机基础课程考试系统研究与实现