追加补充:目前因为mysql更新的原因,使用之前的连接方式是无法批量插入的,会提示错误The used command is not allowed with this MySQL version,因此需要添加一个设置字符,更新之后的连接设置如下:

 string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306;AllowLoadLocalInfile=true;";

因为项目需要,经常导入MongoDB数据到Mysql,故而做了一个小工具来进行数据迁移。

在这个过程中,因为效率问题,探索了很多方法,单条遍历拼接插入命令、使用事务进行处理(也是拼接插入命令)、使用MySqlBulkLoad批量导入数据到mysql。

下边分别描述一下三种处理方法:

a.从MongoDB中获取指定的collection

#region 从MongoDB中获取数据string connetMongoDB = "mongodb://" + MongoDBIP.Text;var Client = new MongoClient(connetMongoDB);_message.messageA += "MongDB 连接成功\n";var DataBase = Client.GetDatabase(MongoDBDataBaseName.Text);var DataCollection = DataBase.GetCollection<BsonDocument>(MongoDBTableName.Text);var documents = DataCollection.Find(new BsonDocument()).Limit(100).ToListAsync().Result;          #endregion

b.从collection中获取数据表的列信息

 foreach (BsonDocument bs in documents){var obj = bs.ToDictionary();if (tableColloumns.Count < obj.Keys.Count){int i = 0;foreach (var key in obj.Keys){string hearder = key.ToString();if (CheckHeader(tableColloumns, hearder)){tableColloumns.Insert(i, RemoveSpecialLetter(hearder));i++;}}}}
foreach (BsonDocument bs in documents){var obj = bs.ToDictionary();if (tableColloumns.Count < obj.Keys.Count){int i = 0;foreach (var key in obj.Keys){string hearder = key.ToString();if (CheckHeader(tableColloumns, hearder)){tableColloumns.Insert(i, RemoveSpecialLetter(hearder));i++;}}}}

c.创建Mysql数据库

 private void CreatMysqlDB(){string connectMysql = "server = " + MysqlIP.Text + "; database =" + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";MySqlConnection conn = new MySqlConnection(connectMysql);conn.Open();string creatMysqlDB = "CREATE DATABASE " + MysqlDatabase.Text + ";";MySqlCommand cmd = new MySqlCommand(creatMysqlDB, conn);_message.messageA += "连接Mysql成功···\n";try{cmd.ExecuteNonQuery();_message.messageA += "创建Mysql数据库成功···\n";}catch (Exception e){MessageBox.Show(e.Message);}}

d.创建mysql数据库表

using (var Conn = new MySqlConnection(connectMysql)){Conn.Open();_message.messageA += "连接Mysql成功···\n";#region 创建数据库表string createStatement = "CREATE TABLE " + MysqlTable.Text + " (";int i = 0;var IsRegex = new Regex("^Is[A-Z]");foreach (string colloumn in tableColloumns){//增加数据库表列的属性设置if (i == 0){createStatement += colloumn + " VarChar(255) not null primary key,";i++;}else if (IsRegex.Match(colloumn).Success){createStatement += colloumn + " tinyint(4),";i++;}else if (colloumn == "Html"){createStatement += colloumn + " longtext,";i++;}else if (colloumn == "Content"){createStatement += colloumn + " longtext,";i++;}else if (colloumn == "ContentHtml"){createStatement += colloumn + " longtext,";i++;}else{createStatement += " " + colloumn + " text,";i++;}}createStatement = createStatement.Remove(createStatement.Length - 1);createStatement += ")   ENGINE=MyISAM DEFAULT CHARSET=utf8";//表设置using (MySqlCommand cmd = new MySqlCommand(createStatement, Conn)){try{cmd.ExecuteNonQuery();_message.messageA += "创建Mysql表成功···\n";}catch (Exception ex){MessageBox.Show(ex.Message);}}#endregion
}

e.开始写入数据

方法1.遍历collection,拼接插入命令(单条执行)

 public class ItemClass{public List<string> values = new List<string>();}public class ItemClass{public List<string> values = new List<string>();}
 public List<string> ChangeToStrlist(Dictionary<string, object>.KeyCollection list){List<string> listStr = new List<string>();foreach (var item in list)listStr.Add(RemoveSpecialLetter(item.ToString()));return listStr;}public static List<string> ChangeToStrlist(Dictionary<string, object>.ValueCollection list){List<string> listStr = new List<string>();foreach (var item in list)listStr.Add(item.ToString());return listStr;}public List<string> ChangeToStrlist(Dictionary<string, object>.KeyCollection list){List<string> listStr = new List<string>();foreach (var item in list)listStr.Add(RemoveSpecialLetter(item.ToString()));return listStr;}public static List<string> ChangeToStrlist(Dictionary<string, object>.ValueCollection list){List<string> listStr = new List<string>();foreach (var item in list)listStr.Add(item.ToString());return listStr;}
#region 单条处理数据string strsql = GetCommand();int count = 0;await DataCollection.Find(new BsonDocument()).ForEachAsync(bs =>{using (var Conn2 = new MySqlConnection(connectMysql)){Conn2.Open();MySqlCommand cmd = new MySqlCommand();cmd.Connection = Conn2;cmd.CommandText = strsql;cmd.Parameters.Clear();ItemClass item = new ItemClass();var obj = bs.ToDictionary();var keys = ChangeToStrlist(obj.Keys);var values = ChangeToStrlist(obj.Values);int temp = 0;foreach (var colloumn in tableColloumns){if (colloumn == keys[temp]){item.values.Add(values[temp]);temp++;}else{item.values.Add(string.Empty);}}temp = 0;foreach (string colloumn in tableColloumns){cmd.Parameters.AddWithValue(colloumn, item.values[temp]);temp++;}try{cmd.ExecuteNonQuery();ncount++;}catch (Exception EX){MessageBox.Show(EX.Message);}if (ncount% 10000 == 0){_message.messageA += "已写入" + ncount.ToString() + "条数据\n";}}});_message.messageA += "已写入" + ncount.ToString() + "条数据\n";_message.messageA += "向Mysql表写入数据完成\n";
#endregion

方法2:使用事务处理

 #region 使用事务处理,一次处理500条数据(有一个问题,当mongdb数据量很大的时候,会内存崩溃,这个问题可以采用分段获取的方式解决,请自行解决)int count = 500;List<ItemClass> items = new List<ItemClass>();foreach (BsonDocument bs in DataCollection.Find(new BsonDocument()).ToListAsync().Result){if (count < 500){var obj = bs.ToDictionary();ItemClass item = new ItemClass();var keys = ChangeToStrlist(obj.Keys);var values = ChangeToStrlist(obj.Values);int temp = 0;foreach (var colloumn in tableColloumns){if (colloumn == keys[temp]){item.values.Add(values[temp]);temp++;}else{item.values.Add(string.Empty);}}items.Add(item);count++;}else{ExecuteSqlTran(items);ncount += count;_message.messageA += "已写入" + ncount.ToString() + "条数据\n";items.Clear();count = 0;}}ExecuteSqlTran(items);ncount += count;items.Clear();_message.messageA += "已写入" + ncount.ToString() + "条数据\n";_message.messageA += "向Mysql表写入数据完成\n";#endregion
 public void ExecuteSqlTran(List<ItemClass> items){string connectMysql2 = "server = " + MysqlIP.Text + "; database =" + MysqlDatabase.Text + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";using (MySqlConnection conn = new MySqlConnection(connectMysql2)){conn.Open();MySqlCommand cmd = new MySqlCommand();cmd.Connection = conn;MySqlTransaction tx = conn.BeginTransaction();cmd.Transaction = tx;string strsql = GetCommand();try{for (int n = 0; n < items.Count; n++){cmd.CommandText = strsql;cmd.Parameters.Clear();var item = items[n];int i = 0;foreach (string colloumn in tableColloumns){cmd.Parameters.AddWithValue(colloumn, item.values[i]);i++;}cmd.ExecuteNonQuery();if (n > 0 && (n % 500 == 0 || n == items.Count - 1)){tx.Commit();counts += items.Count;Console.WriteLine("写入" + counts + "条数据成功");tx = conn.BeginTransaction();}}}catch (System.Data.SqlClient.SqlException E){Console.WriteLine("写入数据失败,回滚");Console.ReadLine();tx.Rollback();MessageBox.Show(E.Message);//throw new Exception(E.Message);}}}public void ExecuteSqlTran(List<ItemClass> items){string connectMysql2 = "server = " + MysqlIP.Text + "; database =" + MysqlDatabase.Text + "; user id = " + MysqlUserName.Text + "; password = " + MysqlPassword.Text + ";pooling=true;CharSet=utf8;port=3306";using (MySqlConnection conn = new MySqlConnection(connectMysql2)){conn.Open();MySqlCommand cmd = new MySqlCommand();cmd.Connection = conn;MySqlTransaction tx = conn.BeginTransaction();cmd.Transaction = tx;string strsql = GetCommand();try{for (int n = 0; n < items.Count; n++){cmd.CommandText = strsql;cmd.Parameters.Clear();var item = items[n];int i = 0;foreach (string colloumn in tableColloumns){cmd.Parameters.AddWithValue(colloumn, item.values[i]);i++;}cmd.ExecuteNonQuery();if (n > 0 && (n % 500 == 0 || n == items.Count - 1)){tx.Commit();counts += items.Count;Console.WriteLine("写入" + counts + "条数据成功");tx = conn.BeginTransaction();}}}catch (System.Data.SqlClient.SqlException E){Console.WriteLine("写入数据失败,回滚");Console.ReadLine();tx.Rollback();MessageBox.Show(E.Message);//throw new Exception(E.Message);}}}
        private string GetCommand(){// string cmdstr = "update WIRELESS_PERSON_T set PersonName=@PersonName, PersonSex='" + person.getPersonSex() + "', YID=@YID, caseinfoid='" + person.getCaseinfoid() + "', Kind='" + person.getKind()+ "', caseremark=@Caseremark, ArrivalKind='" + person.getArrivalKind() + "' where PersonId=" + person.getPersonId();string command = @"insert into `" + MongoDBTableName.Text + "` (";for (int k = 0; k < tableColloumns.Count; k++){if (k == tableColloumns.Count - 1)command += "`" + tableColloumns[k] + "`)";elsecommand += "`" + tableColloumns[k] + "`,";}command += "values( ";for (int j = 0; j < tableColloumns.Count; j++){if (j == tableColloumns.Count - 1)command += "?" + tableColloumns[j] + ")";elsecommand += "?" + tableColloumns[j] + ",";}return command;}

3.使用MysqlBulkLoad进行数据迁移

 #region 批处理处理数据var Filter = Builders<BsonDocument>.Filter;var Cursor = DataCollection.Find(Filter.Empty).ToCursor();var FileName = "temp.txt";using (StreamWriter Writer = new StreamWriter(FileName, false, Encoding.UTF8)){while (await Cursor.MoveNextAsync()){foreach (var Document in Cursor.Current){Writer.Write("<%RS%>");foreach (var col in tableColloumns){if (IsRegex.Match(col).Success){Writer.Write($"{Document.GetValue(col, false).AsBoolean:1?0}<%COL%>");}else if (col == "_id"){Writer.Write($"{Document.GetValue(col).AsObjectId.ToString()}<%COL%>");}else{Writer.Write($"{Document.GetValue(col, string.Empty).AsString}<%COL%>");}}Writer.Write("<%RE%>");}}}var Bulk = new MySqlBulkLoader(Conn);Bulk.TableName = MysqlTable.Text;Bulk.ConflictOption = MySqlBulkLoaderConflictOption.Ignore;Bulk.Local = true;Bulk.Timeout =10* 60 * 1000;Bulk.CharacterSet = "utf8mb4";Bulk.LinePrefix = "<%RS%>";Bulk.LineTerminator = "<%RE%>";Bulk.FieldTerminator = "<%COL%>";Bulk.EscapeCharacter = '\b';Bulk.FileName = FileName;ncount = Bulk.Load();
#endregion_message.messageA += "已写入" + ncount.ToString() + "条数据\n";_message.messageA += "向Mysql表写入数据完成\n";

以上就是三种解决办法,是从最笨的方法慢慢提高效率,所以推荐使用第三种方法。

C# 从MongoDB导入数据到mysql相关推荐

  1. Linux的load导入语句,LOAD DATA INFILE语句导入数据进入MySQL的一些注意事项

    可以用以下语句导入CSV或其他格式数据进入MySQL数据库, LOAD DATA LOCAL INFILE "C:\\wamp\\apps\\litigationinfo.csv" ...

  2. php上传查询excel到mysql_PHP上传Excel文件导入数据到MySQL数据库示例

    PHP上传Excel文件导入数据到MySQL数据库示例2020-06-20 00:34:11 最近在做Excel文件导入数据到数据库.网站如果想支持批量插入数据,可以制作一个上传Excel文件,导入里 ...

  3. 随笔编号-09 批量导入数据(Mysql)报MySQL server has gone away 问题的解决方法

    问题场景: 使用*.sql 脚本,批量导入数据到mysql实例中,使用DOS 界面导入的,期间,到最后一步 source D:\aaa.sql  回车后,系统提示 MySQL server has g ...

  4. 从csv文件中导入数据到MySQL数据库

    从csv文件中导入数据到MySQL数据库 转: 一.Workbench客户端导入(8.0.11基本导不全且速度奇慢) 1.点击如下图标创建数据库(非必要) 2.在表类上右键导入 二.SQL语句导入(可 ...

  5. 3.2.3 Sqoop 数据迁移工具, 导入数据import, MySQL到HDFS/Hive, 导出数据export,增量数据导入, Sqoop job,常用命令及参数

    目录 数据迁移工具 -- Sqoop 第一部分 Sqoop概述 第二部分 安装配置 第三部分 应用案例 第 1 节 导入数据import MySQL 到 HDFS MySQL 到 Hive 第 2 节 ...

  6. Linux下通过txt文件导入数据到MySQL数据库

    1.修改配置文件 在 /etc/my.conf 中添加 local_infile=1 2.重启MySQL >service mysqld restart 3.登录数据库 登录时添加参数 --lo ...

  7. java mysql 文本导入数据语句_Java利用MYSQL LOAD DATA LOCAL INFILE实现大批量导入数据到MySQL...

    Mysql load data的使用 数据库中,最常见的写入数据方式是通过SQL INSERT来写入,另外就是通过备份文件恢复数据库,这种备份文件在MySQL中是SQL脚本,实际上执行的还是在批量IN ...

  8. php 上传excel到mysql_PHP上传Excel文件导入数据到MySQL数据库示例

    最近在做Excel文件导入数据到数据库.网站如果想支持批量插入数据,可以制作一个上传Excel文件,导入里面的数据内容到MySQL数据库的小程序. 要用到的工具: ThinkPHP:轻量级国产PHP开 ...

  9. sqoop从hive导入数据到mysql时出现主键冲突

    今天在将一个hive数仓表导出到mysql数据库时出现进度条一直维持在95%一段时间后提示失败的情况,搞了好久才解决.使用的环境是HUE中的Oozie的workflow任何调用sqoop命令,该死的o ...

  10. mongodb导入数据

    一粘贴复制 var persons = [{name:"jim",age:25,email:"75431457@qq.com",c:89,m:96,e:87,c ...

最新文章

  1. linux 内核 call,在Linux Kernel內新增一个System Call(转)
  2. Target host is not specified错误
  3. ★ 省时省力又省钱--快来看win7家庭版升级旗舰版 ★
  4. Oracle Performance Active Session History ASH图
  5. 机器学习模型解释性工具SHAP
  6. hibernate 延迟加载的错误 failed to lazily initialize a collection of role
  7. mysql 主从 keepalived_MySQL之双向主从加keepalived高可用
  8. JDK 7,jdk1.7 安装及配置
  9. 【飞秋】Asp.net MVC2 model验证 看似美好,实则让人失望。
  10. 橘子游戏平台_apex英雄_游戏快速下载_雷神加速器全网最快
  11. php 实现数组数据查询,thinkphp数据查询和遍历数组实例_PHP
  12. 一个完整的html文件包含哪些标签,HTML基础有哪些单标签
  13. ArrayList 练习
  14. layoutIfNeeded 就这样把我害惨
  15. Flex 与.net 进行通信可以通过Fluorine(fluorinefx),WebORB For .net,Socket
  16. 使用ffmpeg对视频、音频进行分离
  17. 山东理工大学2021年下半年实验室安全考试
  18. 富士通Fujitsu DPK320 打印机驱动
  19. 毕业设计,微信小程序-购物小程序
  20. 电商平台大数据API大全

热门文章

  1. 【滤波器】基于matlab GUI低通+带通+高通FIR与IIR滤波器设计【含Matlab源码 360期】
  2. NLP对放射科医生的评价
  3. 赵本山 政治敏锐_每天5分钟保持敏锐的7种方法
  4. python数据框添加一列无列名_Pandas只使用列名创建空数据框
  5. app 怎么实现后端对前端的通知功能_app测试流程
  6. 2021/4/2听宫老师演讲有感。
  7. 计算机显示桌面的按钮,如何找回Windows“显示桌面”按钮 -电脑资料
  8. 计算机应用基础统考模拟练习系统,网教计算机应用基础统考综合模拟练习题(一)...
  9. node mysql sequlize_初步使用Sequelize模块 - Node实战
  10. 我的 Java 血泪史