python多进程写入mysql_Python实现 多进程导入CSV数据到 MySQL
前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和 7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入 会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:
批量插入而不是逐条插入
为了加快插入速度,先不要建索引
生产者和消费者模型,主进程读文件,多个 worker 进程执行插入
注意控制 worker 的数量,避免对 MySQL 造成太大的压力
注意处理脏数据导致的异常
原始数据是 GBK 编码,所以还要注意转换成 UTF-8
用 click 封装命令行工具
具体的代码实现如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import codecs
import csv
import logging
import multiprocessing
import os
import warnings
import click
import MySQLdb
import sqlalchemy
warnings.filterwarnings('ignore', category=MySQLdb.Warning)
# 批量插入的记录数量
BATCH = 5000
DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'
engine = sqlalchemy.create_engine(DB_URI)
def get_table_cols(table):
sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)
res = engine.execute(sql)
return res.keys()
def insert_many(table, cols, rows, cursor):
sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(
table=table,
cols=', '.join(cols),
marks=', '.join(['%s'] * len(cols)))
cursor.execute(sql, *rows)
logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)
def insert_worker(table, cols, queue):
rows = []
# 每个子进程创建自己的 engine 对象
cursor = sqlalchemy.create_engine(DB_URI)
while True:
row = queue.get()
if row is None:
if rows:
insert_many(table, cols, rows, cursor)
break
rows.append(row)
if len(rows) == BATCH:
insert_many(table, cols, rows, cursor)
rows = []
def insert_parallel(table, reader, w=10):
cols = get_table_cols(table)
# 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据
# 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存
queue = multiprocessing.Queue(maxsize=w*BATCH*2)
workers = []
for i in range(w):
p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))
p.start()
workers.append(p)
logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)
dirty_data_file = './{}_dirty_rows.csv'.format(table)
xf = open(dirty_data_file, 'w')
writer = csv.writer(xf, delimiter=reader.dialect.delimiter)
for line in reader:
# 记录并跳过脏数据: 键值数量不一致
if len(line) != len(cols):
writer.writerow(line)
continue
# 把 None 值替换为 'NULL'
clean_line = [None if x == 'NULL' else x for x in line]
# 往队列里写数据
queue.put(tuple(clean_line))
if reader.line_num % 500000 == 0:
logging.info('put %s tasks into queue.', reader.line_num)
xf.close()
# 给每个 worker 发送任务结束的信号
logging.info('send close signal to worker processes')
for i in range(w):
queue.put(None)
for p in workers:
p.join()
def convert_file_to_utf8(f, rv_file=None):
if not rv_file:
name, ext = os.path.splitext(f)
if isinstance(name, unicode):
name = name.encode('utf8')
rv_file = '{}_utf8{}'.format(name, ext)
logging.info('start to process file %s', f)
with open(f) as infd:
with open(rv_file, 'w') as outfd:
lines = []
loop = 0
chunck = 200000
first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'
lines.append(first_line)
for line in infd:
clean_line = line.decode('gb18030').encode('utf8')
clean_line = clean_line.rstrip() + '\n'
lines.append(clean_line)
if len(lines) == chunck:
outfd.writelines(lines)
lines = []
loop += 1
logging.info('processed %s lines.', loop * chunck)
outfd.writelines(lines)
logging.info('processed %s lines.', loop * chunck + len(lines))
@click.group()
def cli():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
@cli.command('gbk_to_utf8')
@click.argument('f')
def convert_gbk_to_utf8(f):
convert_file_to_utf8(f)
@cli.command('load')
@click.option('-t', '--table', required=True, help='表名')
@click.option('-i', '--filename', required=True, help='输入文件')
@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')
def load_fac_day_pro_nos_sal_table(table, filename, workers):
with open(filename) as fd:
fd.readline() # skip header
reader = csv.reader(fd)
insert_parallel(table, reader, w=workers)
if __name__ == '__main__':
cli()
以上就是本文给大家分享的全部没人了,希望大家能够喜欢
希望与广大网友互动??
点此进行留言吧!
python多进程写入mysql_Python实现 多进程导入CSV数据到 MySQL相关推荐
- 利用JAVA程序批量导入csv数据到MySQL数据库
正在学习利用R进行统计学相关知识的实验,实验数据计划采用北京市环境监测数据,此数据可以在这个网址"https://quotsoft.net/air/"中下载,目前可提供2013年1 ...
- python csv库,Python 中导入csv数据的三种方法
Python 中导入csv数据的三种方法,具体内容如下所示: 1.通过标准的Python库导入CSV文件: Python提供了一个标准的类库CSV文件.这个类库中的reader()函数用来导入CSV文 ...
- mysql linux导入csv主键,MySQL导入csv文件内容到Table及数据库的自增主键设置
写在前面 目的是测试将csv文件内容导入到表中, 同时记录一下自增主键的设置. 测试采用MySQL8.0. 新建表customer_info如下, 未设置主键. 修改上表, 添加主键id, 并设置为自 ...
- python导入excel数据到mysql
python导入excel数据到mysql 使用多线程,目前大概一分钟写入1w条 环境介绍 windows10-x64 python3.6.5-x64 Excel2016 MySql5.7.18 需要 ...
- python怎么导入txt数据库_Python导入txt数据到mysql的方法
本文实例讲述了Python导入txt数据到mysql的方法.分享给大家供大家参考.具体分析如下: 从TXT文本转换数据到MYSQL数据库,接触一段时间python了 第一次写东西 用的是Python2 ...
- datagrip导入csv数据配合ajax+mysql+Flask实验
1.先让datagrip连接mysql数据库ajax_data 2.先连上数据库 2.导入csv数据,勾上First row is_header 重命名为company_info #--------- ...
- Neo4j 导入CSV数据
Neo4j 导入CSV数据 要求 必须有一个或多个 CSV 文件来表示将在图中创建的节点和关系. 必须有一个已启动的现有 Neo4j DBMS. Neo4j 中存储为属性的数据类型 String:字符 ...
- MySQL导入csv数据
MySQL导入csv数据 前言 一.导入步骤 1.MySQL workbench 操作语句 2.用CMD命令窗口导入数据 3.用cmd时犯的一个错误 总结 前言 下载了一个数据组,数据格式是CSV,再 ...
- txt 导入 mysql python_Python导入txt数据到mysql的方法
本文实例讲述了Python导入txt数据到mysql的方法.分享给大家供大家参考.具体分析如下: 从TXT文本转换数据到MYSQL数据库,接触一段时间python了 第一次写东西 用的是Python2 ...
最新文章
- 计算机图形学曲线生成原理,计算机图形学_曲线及生成.ppt
- 【组合数学】生成函数 ( 换元性质 | 求导性质 | 积分性质 )
- Llama-impala on yarn的中间协调服务
- Jquery实现 全选反选
- Effective C++学习第二天
- 深度 | API 设计最佳实践的思考
- 今日头条收购锤子?ofo 半月退 24 万户押金;斗鱼索赔主播 1.5 亿元 | 极客头条...
- ubuntu下的jdk进行升级_Ubuntu下JDK升级1.7
- 一个针对.net的好的建模工具 powerdesign 11
- colspan会影响内部单元格宽度失效_封装胶残留致MEMS振动传感器失效分析
- 无线电监测软件java_大牛干货:软件无线电的设计和测试
- VS之sonar插件安装
- node创建ETH地址及导出私钥
- Spring Boot Actuator自定义健康检查
- 微信公众号答题功能搭建
- C++11 使用智能指针封装 pimpl idom
- 豆瓣上征婚交友的小姐姐们
- 计算机科学 在职双证,计算机专业在职研究生有双证的吗?
- Ubuntu18.04 安装 ROS Melodic(同时解决 rosdep update 问题,亲测有效)
- day1——SpringBoot介绍
热门文章
- VS2013正在等待所需操作完成
- ubuntu16.04 计算视觉算法相关软件安装 亲测可用
- 阿里云能耗宝发布,助力中小企业绿色升级,参与碳中和万亿市场
- 小程序下一破局点?钉钉小程序卡片,应用与平台的深度集成
- 如何基于MaxCompute快速打通数据仓库和数据湖的湖仓一体实践
- MaxCompute 实现增量数据推送(全量比对增量逻辑)
- Flutter高内聚组件怎么做?阿里闲鱼打造开源高效方案!
- PTS + ARMS打造性能和应用诊断利器
- 实战:阿里巴巴 DevOps 转型后的运维平台建设
- 34 年了,“杀”不死的 Perl!