最近在项目中有一个需求是没五分钟爬虫抓取一批网上数据,然后实时的将数据更新到mysql和redis中存储,在存储数据时,需要对数据做一些标准化的操作,甚至还需要根据历史数据对新数据某些缺失字段进行融合操作,往往在数据量比较大时,会出现五分钟内无法处理完parser产生的数据,导致数据的堆积。

数据处理逻辑中,其实是可以根据某些特性将全量数据划分为独立的规模较小的数据子集合,每个子集互相独立,可独立进行处理,在更新数据时,不需要用到其他子集的数据。基于该特性以及多核服务器的特性,多线程(多进程)就成为一个加快数据更新速度的较好方案。

我们的整个数据更新服务都是使用python实现的。

在实现对数据进行并行处理时,首先想到的是利用多线程,将全量数据进行划分后,对每个子集数据启动一个子线程单独处理,待所有子线程都处理完各自的子集数据时,父进程在进行剩余的操作。

1、使用threading实现多线程

python中实现多线程可以使用thread或者是threading,官方文档是推荐threading模块实现的。threading实现多线程很简单。

整个实现的逻辑如下所示,其中具体的操作使用注释代替,主要是描述多线程逻辑。

<span style="font-size:14px;"><span style="font-family:KaiTi_GB2312;font-size:14px;">import time, threadingdef sub_func(sub_data,result):print 'begin deal data...'/**** do something*****/result = /**deal with data***/print 'finish thread'if __name__ == '__main__':data_list = [data1,data2]result_list = [result_1,result_2]threads = []for idx in range(len(data_list)):threads.append(threading.Thread(target=sub_func,args = (data_list[idx],result_list[idx]))for cand_thread in threads:cand_thread.start()for cand_thread in thread:cand_thread.join()/***do other things to finsh th updaing***/</span><span style="font-family: KaiTi_GB2312;font-size:14px;">do_other()     </span></span>

在上面的多线程代码中,启动每个子线程处理单独的数据子集,父进程阻塞,等待所有子线程结束,然后继续执行do_other函数,执行剩余的操作。

完成上述代码后,分别测试单线程和多线程之间的性能差异,发现一个比较奇怪的现象,同样大小的数据,单线程消耗的时间总是比多线程要少,而且使用多线程的时候,服务器的CPU只有其中一个的使用率会超过100%,其他CPU使用率非常低。这说明多线程代码根本没用充分地利用多服务器,而是在同一个服务器上实现了多线程。

上网google了一下(当然需要自备梯子~~),才发现原来python的多线程和C/C++的多线性是不一样的,根本原因在于Python中的Global Interpreter Lock。

在Python 多线程当中,存在一个叫Global Interpreter Lock(GIL)的东西,直译就是全局解释器锁。它的作用在于让同一时刻只能有一个线程对于python对象进行操作。Python已经提供了各种机制让我们进行多线程同步,为什么又要整这个GIL呢?这是因为程序员控制的同步是对各个程序中可见的变量,而GIL同步的是解释器后台的不可见变量,比如为了进行垃圾回收而维护的引用计数。如果没有GIL,有可能出现由于线程切换导致的对同一个对象释放两次的情况。

因此,任何一个CPython线程如果要执行,就必须先获取这个GIL。后果?就是在CPython中,本质上几乎是没有线程并行的,不论你开多少个线程,同一时刻只有获取GIL的那个线程能够执行。

当然也可以避免这个问题,那就需要使用一些其他的包了,否者如果是存的python代码,使用CPython,那么这个问题很难避免。

由于不想引入其他的包,因此想到使用多进程实现,因为每一个进程是独立拥有自己的GIL的,这就避免了上面描述的问题,从而可以实现多核处理器的使用。

2、python多进程

在多进程中,和线程不同的时,每个子进程拥有自己独立的数据空间,这样就不能想多线程一样让每个每个子进程将运行结果返回给父进程。这就需要实现进程间的通信。

python中进程间通信通常使用两类:Queue和Pipe。Queue和Pipe的使用可直接查看官方文档:https://docs.python.org/2/library/multiprocessing.html。

我选择了较简单地queue来实现进程间通信,注意这里用的是multiprocessing.Queue,而不是Queue,multiprocessing.Queue是线程/进程安全的。

在实现中,每个子进程将运行的结果put到queue中,父进程从queue中get结果进程后续处理。一切看起来都perfect。

Talk is cheap, show me the code~~

<span style="font-size:14px;">import multiprocessingdef sub_func(sub_data,process_queue):print 'begin deal data...'/**** do something*****/result = /**deal with data***/process_queue.put(result)print 'finish thread'if __name__ == '__main__':data_list = [data1,data2]result_list = [result_1,result_2]process_queue = multiprocessing.Queue();process_list = []for idx in range(len(data_list)):cand_process = multiprocessing.Process(target=sub_func,args = (data_list[idx],process_queue))cand_process.start()process_list.append(cand_process)for cand_process in process_list:cand_process.join()while False == process_queue.empty():cand_result = process_queue.get()do_other(cand_result)
</span>

事情看起来似乎马上就over了,就快要下班回去陪女票了。接下来运行代码,比较一下性能,见证奇迹的时刻就要到了,

可是,尼玛运行了10分钟了,开了三个子进程,加上父进程,四个进程没有一个结束的,每个进程的CPU使用率都接近0。我擦 ,一定是我打开的方式不对,各种看文档,没发现错误啊,文档就是那么写的,示例代码也是这样啊,,,,

抓狂,看来今晚陪女票的计划泡汤了,改为加完班回去跪主板吧!!!

官方文档没找到解决方案,好吧,上万能的google,输入python multiprocessing queue deadlock,我擦,奇迹出现了,我不是一个人在战斗,这是什么节奏。

看了一下网上大家遇到的问题,queue出现deadlock都是因为对queue存在get、set操作,当get操作出现queue为空时,queue阻塞,此时queue的set操作也会阻塞,从而造成死锁。可是我的代码里不是这样的逻辑,在对queue进行get之前,调用join保证所有子进程都结束了。可是仍然出现了死锁,加上各种log,从log中发现,调用第一次join后,父进程就死锁了,这是为什么呢。

查看join函数的文档:

join([timeout])

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

If timeout is None then there is no timeout.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

也就是说,当调用join的时候,必须保持子进程当前没有结束,否者子进程会出现死锁。

解决方法:

使用不断查询queue的返回结果数量,来判断子进程是否都结束了。当子进程都结束,在处理接下来的逻辑:

<span style="font-size:14px;">import multiprocessingdef sub_func(sub_data,process_queue):print 'begin deal data...'/**** do something*****/result = /**deal with data***/process_queue.put(result)print 'finish thread'if __name__ == '__main__':data_list = [data1,data2]result_list = [result_1,result_2]process_queue = multiprocessing.Queue();process_list = []for idx in range(len(data_list)):cand_process = multiprocessing.Process(target=sub_func,args = (data_list[idx],process_queue))cand_process.start()process_list.append(cand_process)result_received = []while len(result_received) < process_count:result_received.append(process_queue.get())sleep(3)for cand_result in result_received:do_other(cand_data)</span>

总感觉还有更加优美的处理方法,等抽空研究了在更新~~

python多线程、多进程相关推荐

  1. python多线程多进程

    一.线程&进程 对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程, ...

  2. python多线程 多进程

    多进程与多线程 我们都知道,操作系统中所有的程序都是以进程的方式来运行的,或者说我们把运行着的程序称为进程(Process).例如运行记事本程序就是启动一个记事本进程,运行两个记事本就是启动两个记事本 ...

  3. Python多线程多进程应用场景

    IO密集型: 推荐使用多线程 CPU密集型: 推荐使用多进程 为什么Python多线程在CPU密集型中是鸡肋?通过一段代码来测试 单线程执行从1亿递减到0 import datetimedef run ...

  4. Python多线程多进程、异步、异常处理等高级用法

    文章目录 前言 多线程多进程 多线程 多进程 协程 总结 异步 基本概念 异步编程 asyncio aiohttp 异常 常见异常 异常处理 自定义异常 lambda表达式 lambda表达式用法 高 ...

  5. Python 多线程+多进程简单使用教程,如何在多进程开多线程

    一.Python多进程多线程 关于python多进程多线程的相关基础知识,在我之前的博客有写过,并且就关于python多线程的GIL锁问题,也在我的一篇博客中有相关的解释. 为什么python多线程在 ...

  6. Python 多线程 多进程 协程 yield

    python中多线程和多进程的最大区别是稳定性和效率问题 多进程互相之间不影响,一个崩溃了不影响其他进程,稳定性高 多线程因为都在同一进程里,一个线程崩溃了整个进程都完蛋 多进程对系统资源开销大,多线 ...

  7. python 多线程多进程logging系统写入同一日志文件处理方法

    多线程进程,logging写入日志到同一文件的处理方法 python logging系统切分问题 TimedRotatingFileHandler切分逻辑源码 解决方案 python logging系 ...

  8. python多线程多进程多协程_python 多进程、多线程、协程

    1.python的多线程 多线程就是在同一时刻执行多个不同的程序,然而python中的多线程并不能真正的实现并行,这是由于cpython解释器中的GIL(全局解释器锁)捣的鬼,这把锁保证了同一时刻只有 ...

  9. python学习笔记(十六)-Python多线程多进程

    一.线程&进程 对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程, ...

  10. python 多线程 多进程 zmq_研二硕, Python +pyqt,多进程问题求助

    51 60 天前 @knightdf import sys from PyQt5.QtWidgets import QMainWindow,QApplication,QWidget from unti ...

最新文章

  1. axios get请求_Axios使用指南
  2. python不同版本共存_多版本Python共存的配置方法
  3. 命令测试post_【第2088期】前端中台化,把格局做大——NodeJS 和测试服务探索
  4. [CV招聘]中科院空间应用工程与技术中心2019年校园招聘
  5. AutoCAD2004启动时出现fail to get CommcntrController的怎么办
  6. excel删除行 uipath_工作再忙,也要学会这10个最经典的Excel小技巧
  7. rsync常见问题及解决办法
  8. Cannot load driver ‘C:\Keil_v5\ARM\Segger\JL2CM3.dll 报错解决方法。
  9. webUploader大文件断点续传学习心得 多文件
  10. 【MySQL】轻松学习 唯一索引
  11. c#创建画布_WinForm GDI编程:Graphics画布类
  12. TypeScript学习--Symbols
  13. 开发功能更加完善的智能颈部按摩仪
  14. 如何评判一个企业是否需要实施erp系统?
  15. 为什么越来越多的企业选择短信接口平台?
  16. ABP VNext学习日记5
  17. 【咩了个咩】最通关100W+最详教程,不会有人还没有过关吧【含视频教程】
  18. Docker版本名称YY.MM修改聚焦
  19. active控件读取服务器文件,Delphi下利用ActiveX控件读取PDF文件
  20. IRF3808STRRPBF N 通道 75 V 106A MOS 管

热门文章

  1. 镁光139 8510
  2. Nginx、图片上传、FastDFS
  3. 跨境电商 Shopee 的实时数仓演进之路
  4. 网络与验证服务器失联怎样修复,GCP用一键服务器失联了,如何重装系统?
  5. CLIP: 打通文本图像迁移模型的新高度
  6. 鞘氨醇-1-磷酸的生物学作用
  7. GaRy-Liang的linux成长日记3-自动化安装
  8. spring集成druid示例
  9. 如何用计算机把数字12变成21,2015年12月计算机二级office考试题及答案
  10. 基于C语言实现的自动打乱九宫格并且还原