前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和 7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入 会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:

批量插入而不是逐条插入

为了加快插入速度,先不要建索引

生产者和消费者模型,主进程读文件,多个 worker 进程执行插入

注意控制 worker 的数量,避免对 MySQL 造成太大的压力

注意处理脏数据导致的异常

原始数据是 GBK 编码,所以还要注意转换成 UTF-8

用 click 封装命令行工具

具体的代码实现如下:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import codecs

import csv

import logging

import multiprocessing

import os

import warnings

import click

import MySQLdb

import sqlalchemy

warnings.filterwarnings('ignore', category=MySQLdb.Warning)

# 批量插入的记录数量

BATCH = 5000

DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'

engine = sqlalchemy.create_engine(DB_URI)

def get_table_cols(table):

sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)

res = engine.execute(sql)

return res.keys()

def insert_many(table, cols, rows, cursor):

sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(

table=table,

cols=', '.join(cols),

marks=', '.join(['%s'] * len(cols)))

cursor.execute(sql, *rows)

logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)

def insert_worker(table, cols, queue):

rows = []

# 每个子进程创建自己的 engine 对象

cursor = sqlalchemy.create_engine(DB_URI)

while True:

row = queue.get()

if row is None:

if rows:

insert_many(table, cols, rows, cursor)

break

rows.append(row)

if len(rows) == BATCH:

insert_many(table, cols, rows, cursor)

rows = []

def insert_parallel(table, reader, w=10):

cols = get_table_cols(table)

# 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据

# 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存

queue = multiprocessing.Queue(maxsize=w*BATCH*2)

workers = []

for i in range(w):

p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))

p.start()

workers.append(p)

logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)

dirty_data_file = './{}_dirty_rows.csv'.format(table)

xf = open(dirty_data_file, 'w')

writer = csv.writer(xf, delimiter=reader.dialect.delimiter)

for line in reader:

# 记录并跳过脏数据: 键值数量不一致

if len(line) != len(cols):

writer.writerow(line)

continue

# 把 None 值替换为 'NULL'

clean_line = [None if x == 'NULL' else x for x in line]

# 往队列里写数据

queue.put(tuple(clean_line))

if reader.line_num % 500000 == 0:

logging.info('put %s tasks into queue.', reader.line_num)

xf.close()

# 给每个 worker 发送任务结束的信号

logging.info('send close signal to worker processes')

for i in range(w):

queue.put(None)

for p in workers:

p.join()

def convert_file_to_utf8(f, rv_file=None):

if not rv_file:

name, ext = os.path.splitext(f)

if isinstance(name, unicode):

name = name.encode('utf8')

rv_file = '{}_utf8{}'.format(name, ext)

logging.info('start to process file %s', f)

with open(f) as infd:

with open(rv_file, 'w') as outfd:

lines = []

loop = 0

chunck = 200000

first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'

lines.append(first_line)

for line in infd:

clean_line = line.decode('gb18030').encode('utf8')

clean_line = clean_line.rstrip() + '\n'

lines.append(clean_line)

if len(lines) == chunck:

outfd.writelines(lines)

lines = []

loop += 1

logging.info('processed %s lines.', loop * chunck)

outfd.writelines(lines)

logging.info('processed %s lines.', loop * chunck + len(lines))

@click.group()

def cli():

logging.basicConfig(level=logging.INFO,

format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')

@cli.command('gbk_to_utf8')

@click.argument('f')

def convert_gbk_to_utf8(f):

convert_file_to_utf8(f)

@cli.command('load')

@click.option('-t', '--table', required=True, help='表名')

@click.option('-i', '--filename', required=True, help='输入文件')

@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')

def load_fac_day_pro_nos_sal_table(table, filename, workers):

with open(filename) as fd:

fd.readline() # skip header

reader = csv.reader(fd)

insert_parallel(table, reader, w=workers)

if __name__ == '__main__':

cli()

以上就是本文给大家分享的全部没人了,希望大家能够喜欢

希望与广大网友互动??

点此进行留言吧!

python多进程写入mysql_Python实现 多进程导入CSV数据到 MySQL相关推荐

  1. 利用JAVA程序批量导入csv数据到MySQL数据库

    正在学习利用R进行统计学相关知识的实验,实验数据计划采用北京市环境监测数据,此数据可以在这个网址"https://quotsoft.net/air/"中下载,目前可提供2013年1 ...

  2. python csv库,Python 中导入csv数据的三种方法

    Python 中导入csv数据的三种方法,具体内容如下所示: 1.通过标准的Python库导入CSV文件: Python提供了一个标准的类库CSV文件.这个类库中的reader()函数用来导入CSV文 ...

  3. mysql linux导入csv主键,MySQL导入csv文件内容到Table及数据库的自增主键设置

    写在前面 目的是测试将csv文件内容导入到表中, 同时记录一下自增主键的设置. 测试采用MySQL8.0. 新建表customer_info如下, 未设置主键. 修改上表, 添加主键id, 并设置为自 ...

  4. python导入excel数据到mysql

    python导入excel数据到mysql 使用多线程,目前大概一分钟写入1w条 环境介绍 windows10-x64 python3.6.5-x64 Excel2016 MySql5.7.18 需要 ...

  5. python怎么导入txt数据库_Python导入txt数据到mysql的方法

    本文实例讲述了Python导入txt数据到mysql的方法.分享给大家供大家参考.具体分析如下: 从TXT文本转换数据到MYSQL数据库,接触一段时间python了 第一次写东西 用的是Python2 ...

  6. datagrip导入csv数据配合ajax+mysql+Flask实验

    1.先让datagrip连接mysql数据库ajax_data 2.先连上数据库 2.导入csv数据,勾上First row is_header 重命名为company_info #--------- ...

  7. Neo4j 导入CSV数据

    Neo4j 导入CSV数据 要求 必须有一个或多个 CSV 文件来表示将在图中创建的节点和关系. 必须有一个已启动的现有 Neo4j DBMS. Neo4j 中存储为属性的数据类型 String:字符 ...

  8. MySQL导入csv数据

    MySQL导入csv数据 前言 一.导入步骤 1.MySQL workbench 操作语句 2.用CMD命令窗口导入数据 3.用cmd时犯的一个错误 总结 前言 下载了一个数据组,数据格式是CSV,再 ...

  9. txt 导入 mysql python_Python导入txt数据到mysql的方法

    本文实例讲述了Python导入txt数据到mysql的方法.分享给大家供大家参考.具体分析如下: 从TXT文本转换数据到MYSQL数据库,接触一段时间python了 第一次写东西 用的是Python2 ...

最新文章

  1. 计算机图形学曲线生成原理,计算机图形学_曲线及生成.ppt
  2. 【组合数学】生成函数 ( 换元性质 | 求导性质 | 积分性质 )
  3. Llama-impala on yarn的中间协调服务
  4. Jquery实现 全选反选
  5. Effective C++学习第二天
  6. 深度 | API 设计最佳实践的思考
  7. 今日头条收购锤子?ofo 半月退 24 万户押金;斗鱼索赔主播 1.5 亿元 | 极客头条...
  8. ubuntu下的jdk进行升级_Ubuntu下JDK升级1.7
  9. 一个针对.net的好的建模工具 powerdesign 11
  10. colspan会影响内部单元格宽度失效_封装胶残留致MEMS振动传感器失效分析
  11. 无线电监测软件java_大牛干货:软件无线电的设计和测试
  12. VS之sonar插件安装
  13. node创建ETH地址及导出私钥
  14. Spring Boot Actuator自定义健康检查
  15. 微信公众号答题功能搭建
  16. C++11 使用智能指针封装 pimpl idom
  17. 豆瓣上征婚交友的小姐姐们
  18. 计算机科学 在职双证,计算机专业在职研究生有双证的吗?
  19. Ubuntu18.04 安装 ROS Melodic(同时解决 rosdep update 问题,亲测有效)
  20. day1——SpringBoot介绍

热门文章

  1. VS2013正在等待所需操作完成
  2. ubuntu16.04 计算视觉算法相关软件安装 亲测可用
  3. 阿里云能耗宝发布,助力中小企业绿色升级,参与碳中和万亿市场
  4. 小程序下一破局点?钉钉小程序卡片,应用与平台的深度集成
  5. 如何基于MaxCompute快速打通数据仓库和数据湖的湖仓一体实践
  6. MaxCompute 实现增量数据推送(全量比对增量逻辑)
  7. Flutter高内聚组件怎么做?阿里闲鱼打造开源高效方案!
  8. PTS + ARMS打造性能和应用诊断利器
  9. 实战:阿里巴巴 DevOps 转型后的运维平台建设
  10. 34 年了,“杀”不死的 Perl!