1、multiprocess模块详解

Python的os模块封装了常见的系统调用,其中就包含 「fork函数」,通过这个函数可以轻松的创建子进程,但是要注意一点,在Windows系统上是无法使用fork函数的,Python为我们提供了可跨平台的multiprocess模块。该模块提供了一个Process类来代表一个进程对象,用法和Thread非常相似。

① Process进程对象

创建一个进程的代码示例如下:

from multiprocessing import Process
import osdef show_msg(name):print("子进程运行中:name = %s , pid = %d " % (name, os.getpid()))if __name__ == '__main__':print("父进程 %d" % os.getpid())p = Process(target=show_msg, args=('测试',))print("开始执行子进程~")p.start()p.join()print("子进程执行完毕!")

运行结果如下

父进程 26539
开始执行子进程~
子进程运行中:name = 测试 , pid = 26540
子进程执行完毕!

Process构造函数

Process(group=None, target=None, name=None, args=(), kwargs={})

参数详解

  • group:分组,很少用到
  • target:调用对象,传入任务执行函数作为参数
  • name:进程别名
  • args:给调用对象以元组的形式提供参数,比如有两个参数args=(a,b),如果只有一个参数,要这样写args=(a,),不能把逗号漏掉,不然会被当做括号运算符使用!
  • kwargs:调用对象的关键字参数字典

Process的常用函数

  • is_alive():判断进程实例是否还在执行;
  • join([timeout]):是否等待进程实例执行结束,或等待多少秒;
  • start():启动进程实例(创建子进程);
  • run():如果没有给定target参数,对这个对象调用start()方法时,
    就将执行对象中的run()方法;
  • terminate():不管任务是否完成,立即终止;

除了使用fork函数和上述操作创建进程的方式外,还可以自定义一个Process类,重写 __init__run函数 即可,代码示例如下:

from multiprocessing import Process
import osclass MyProcess(Process):def __init__(self, name):Process.__init__(self)self.msg = namedef run(self):print("子进程运行中:name = %s , pid = %d " % (self.msg, os.getpid()))if __name__ == '__main__':print("父进程 %d" % os.getpid())p = MyProcess('测试')print("开始执行子进程~")p.start()p.join()print("子进程执行完毕!")

运行结果如下

父进程 26794
开始执行子进程~
子进程运行中:name = 测试 , pid = 26795
子进程执行完毕!

② Pool进程池

知道了如何创建进程,那么实现多进程有不是什么难事了,一个循环创建多个即可,但是有个问题,进程可是重量级别的程序,重复进程创建和销毁会造成一定的性能开销! Python为我们提供了一个进程池对象Pool用来缓解进程重复关启带来的性能消耗问题。在创建进程池的时候指定一个容量,如果接收到一个新任务,而池没满的话,会创建一个新的进程来执行这个任务,如果池满的话,任务则会进入等待状态,直到池中有进程结束,才会创建新的进程来执行这个任务。

Pool的构造函数

Pool(processes=None, initializer=None, initargs=(),maxtasksperchild=None, context=None)

一般只用到第一个参数,processes用于设置进程池的容量,即最多并发的进程数量,如果不写默认使用 os.cpu_count() 返回的值。

Pool常用函数详解

  • apply(func, args=(), kwds={}):使用堵塞方式调用func,堵塞的意思是一个进程结束,
    释放回进程池,下一个进程才可以开始,args为传递给func的参数列表,kwds为传递给
    func的关键字参数列表,该方法在Python 2.3后就不建议使用了。
  • apply_async(func, args=(), kwds={}, callback=None,error_callback=None) :使用非阻塞方式调用func,进程池进程最大数可以同时执行,还支持返回结果后进行回调。
  • close():关闭进程池,使其不再接受新的任务;
  • terminate():结束工作进程,不再处理未处理的任务,不管任务是否完成,立即终止;
  • join():主进程阻塞,等待⼦进程的退出,必须在close或terminate之后使用;
  • map(func, iterable, chunksize=None):这里的map函数和Python内置的高阶函数map类似,只是这里的map方法是在进程池多进程并发进行的,接收一个函数和可迭代对象,把函数作用到每个元素,得到一个新的list返回。

最简单的进程池代码示例如下

import multiprocessing as mp
import timedef func(msg):time.sleep(1)print(mp.current_process().name + " : " + msg)if __name__ == '__main__':pool = mp.Pool()for i in range(20):msg = "Do Something %d" % (i)pool.apply_async(func, (msg,))pool.close()pool.join()print("子进程执行任务完毕!")

运行结果如下

ForkPoolWorker-4 : Do Something 3
ForkPoolWorker-2 : Do Something 1
ForkPoolWorker-1 : Do Something 0
ForkPoolWorker-3 : Do Something 2
ForkPoolWorker-5 : Do Something 4
ForkPoolWorker-6 : Do Something 5
ForkPoolWorker-7 : Do Something 6
ForkPoolWorker-8 : Do Something 7
ForkPoolWorker-2 : Do Something 9
ForkPoolWorker-4 : Do Something 8
ForkPoolWorker-1 : Do Something 11
ForkPoolWorker-7 : Do Something 12
ForkPoolWorker-5 : Do Something 13
ForkPoolWorker-6 : Do Something 14
ForkPoolWorker-3 : Do Something 10
ForkPoolWorker-8 : Do Something 15
ForkPoolWorker-6 : Do Something 19
ForkPoolWorker-1 : Do Something 17
ForkPoolWorker-5 : Do Something 18
ForkPoolWorker-7 : Do Something 16
子进程执行任务完毕!

上面的输出结果顺序并没有按照循环中的顺序输出,可以利用 apply_async 的返回值是:被进程调用的函数的返回值,来规避,修改后的代码如下:

import multiprocessing as mp
import timedef func(msg):time.sleep(1)return mp.current_process().name + " : " + msgif __name__ == '__main__':pool = mp.Pool()results = []for i in range(20):msg = "Do Something %d" % iresults.append(pool.apply_async(func, (msg,)))pool.close()pool.join()for result in results:print(result.get())print("子进程执行任务完毕!")

运行结果如下

ForkPoolWorker-1 : Do Something 0
ForkPoolWorker-2 : Do Something 1
ForkPoolWorker-3 : Do Something 2
ForkPoolWorker-4 : Do Something 3
ForkPoolWorker-5 : Do Something 4
ForkPoolWorker-7 : Do Something 6
ForkPoolWorker-6 : Do Something 5
ForkPoolWorker-8 : Do Something 7
ForkPoolWorker-1 : Do Something 8
ForkPoolWorker-2 : Do Something 9
ForkPoolWorker-4 : Do Something 11
ForkPoolWorker-3 : Do Something 10
ForkPoolWorker-7 : Do Something 12
ForkPoolWorker-8 : Do Something 13
ForkPoolWorker-5 : Do Something 14
ForkPoolWorker-6 : Do Something 15
ForkPoolWorker-1 : Do Something 16
ForkPoolWorker-2 : Do Something 17
ForkPoolWorker-4 : Do Something 18
ForkPoolWorker-3 : Do Something 19
子进程执行任务完毕!

感觉还是有点模糊,通过一个多进程统计目录下文件的行数和字符个数的脚本来巩固,代码示例如下:

import multiprocessing as mp
import time
import osresult_file = 'result.txt'  # 统计结果写入文件名# 获得路径下的文件列表
def get_files(path):file_list = []for file in os.listdir(path):if file.endswith('py'):file_list.append(os.path.join(path, file))return file_list# 统计每个文件中函数与字符数
def get_msg(path):with open(path, 'r', encoding='utf-8') as f:content = f.readlines()f.close()lines = len(content)char_count = 0for i in content:char_count += len(i.strip("\n"))return lines, char_count, path# 将数据写入到文件中
def write_result(result_list):with open(result_file, 'a', encoding='utf-8') as f:for result in result_list:f.write(result[2] + " 行数:" + str(result[0]) + " 字符数:" + str(result[1]) + "\n")f.close()if __name__ == '__main__':start_time = time.time()file_list = get_files(os.getcwd())pool = mp.Pool()result_list = pool.map(get_msg, file_list)pool.close()pool.join()write_result(result_list)print("处理完毕,用时:", time.time() - start_time)

运行结果如下

# 控制台输出
处理完毕,用时: 0.13662314414978027# result.txt文件内容
/Users/jay/Project/Python/Book/Chapter 11/11_4.py 行数:33 字符数:621
/Users/jay/Project/Python/Book/Chapter 11/11_1.py 行数:32 字符数:578
/Users/jay/Project/Python/Book/Chapter 11/11_5.py 行数:52 字符数:1148
/Users/jay/Project/Python/Book/Chapter 11/11_13.py 行数:20 字符数:333
/Users/jay/Project/Python/Book/Chapter 11/11_16.py 行数:62 字符数:1320
/Users/jay/Project/Python/Book/Chapter 11/11_12.py 行数:23 字符数:410
/Users/jay/Project/Python/Book/Chapter 11/11_15.py 行数:48 字符数:1087
/Users/jay/Project/Python/Book/Chapter 11/11_8.py 行数:17 字符数:259
/Users/jay/Project/Python/Book/Chapter 11/11_11.py 行数:18 字符数:314
/Users/jay/Project/Python/Book/Chapter 11/11_10.py 行数:46 字符数:919
/Users/jay/Project/Python/Book/Chapter 11/11_14.py 行数:20 字符数:401
/Users/jay/Project/Python/Book/Chapter 11/11_9.py 行数:31 字符数:623
/Users/jay/Project/Python/Book/Chapter 11/11_2.py 行数:32 字符数:565
/Users/jay/Project/Python/Book/Chapter 11/11_6.py 行数:23 字符数:453
/Users/jay/Project/Python/Book/Chapter 11/11_7.py 行数:37 字符数:745
/Users/jay/Project/Python/Book/Chapter 11/11_3.py 行数:29 字符数:518

③ 进程间共享数据

涉及到了多个进程,不可避免的要处理进程间数据交换问题,多进程不像多线程,不同进程之间内存是不共享的,multiprocessing模块提供了四种进程间共享数据的方式:QueueValue和ArrayManager.dict和pipe。下面一一介绍这四种方式的具体用法。

  • 1.Queue队列

多进程安全的队列,put方法用以插入数据到队列中,put方法有两个可选参数:blockedtimeout若blocked为True(默认)且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常,而get方法则从队列读取并且删除一个元素,参数规则同抛出的一场是Queue.Empty。另外Queue不止适用于进程通信,也适用于线程,顺道写一个比较单线程,多线程
和多进程的运行效率对比示例,具体代码如下:

import threading as td
import multiprocessing as mp
import timedef do_something(queue):result = 0for i in range(100000):result += i ** 2queue.put(result)# 单线程
def normal():result = 0for _ in range(3):for i in range(100000):result += i ** 2print("单线程处理结果:", result)# 多线程
def multi_threading():q = mp.Queue()t1 = td.Thread(target=do_something, args=(q,))t2 = td.Thread(target=do_something, args=(q,))t3 = td.Thread(target=do_something, args=(q,))t1.start()t2.start()t3.start()t1.join()t2.join()t3.join()print("多线程处理结果:", (q.get() + q.get() + q.get()))# 多进程
def multi_process():q = mp.Queue()p1 = mp.Process(target=do_something, args=(q,))p2 = mp.Process(target=do_something, args=(q,))p3 = mp.Process(target=do_something, args=(q,))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join()print("多进程处理结果:", (q.get() + q.get() + q.get()))if __name__ == '__main__':start_time_1 = time.time()normal()start_time_2 = time.time()print("单线程处理耗时:", start_time_2 - start_time_1)multi_threading()start_time_3 = time.time()print("多线程处理耗时:", start_time_3 - start_time_2)multi_process()start_time_4 = time.time()print("多继承处理耗时:", start_time_4 - start_time_3)

运行结果如下

单线程处理结果: 999985000050000
单线程处理耗时: 0.10726284980773926
多线程处理结果: 999985000050000
多线程处理耗时: 0.13849401473999023
多进程处理结果: 999985000050000
多继承处理耗时: 0.041596174240112305

从上面的结果可以明显看出在处理CPU密集型任何时,多进程更优。

  • 2.Value和Array

两者是通过「共享内存」的方式来共享数据的,前者用于需要共享单个值后者用于
共享多个值(数组),构造函数的第一个元素数据类型第二个元素。数据类型对照如表所示。

标记 数据类型 标记 数据类型
‘c’ ctypes.c_char ‘u’ ctypes.c_wchar
‘b’ ctypes.c_byte ‘B’ ctypes.c_ubyte
‘h’ ctypes.c_short ‘H’ ctypes.c_ushort
‘i’ ctypes.c_int ‘I’ ctypes.c_uint
‘l’ ctypes.c_long ‘L’ ctypes.c_ulong
‘f’ ctypes.c_float ‘d’ ctypes.c_double

使用代码示例如下

import multiprocessing as mpdef do_something(num, arr):num.value += 1for i in range(len(arr)):arr[i] = arr[i] * 2
if __name__ == '__main__':value = mp.Value('i', 1)array = mp.Array('i', range(5))print("刚开始的值:", value.value, array[:])# 创建进程1p1 = mp.Process(target=do_something, args=(value, array))p1.start()p1.join()print("进程1操作后的值:", value.value, array[:])# 创建进程2p2 = mp.Process(target=do_something, args=(value, array))p2.start()p2.join()print("进程2操作后的值:", value.value, array[:])

运行结果如下

刚开始的值: 1 [0, 1, 2, 3, 4]
进程1操作后的值: 2 [0, 2, 4, 6, 8]
进程2操作后的值: 3 [0, 4, 8, 12, 16]
  • 3.Manager

Python还为我们提供更加强大的数据共享类,支持更丰富的数据类型,比如Value、Array、dict、list、Lock、Semaphore等等,另外Manager还可以共享类的实例对象。有一点要注意:进程间通信应该尽量避免使用共享数据的方式!

使用代码示例如下

import multiprocessing as mp
import os
import timedef do_something(dt):dt[os.getpid()] = int(time.time())print(data_dict)if __name__ == '__main__':manager = mp.Manager()data_dict = manager.dict()for i in range(3):p=mp.Process(target=do_something,args=(data_dict,))p.start()p.join()

运行结果如下

{5432: 1533200189}
{5432: 1533200189, 5433: 1533200189}
{5432: 1533200189, 5433: 1533200189, 5434: 1533200189}
  • 4.Pipe

管道简化版的Queue,通过Pipe()构造函数可以创建一个进程通信用的管道对象默认双向,意味着使用管道只能同时开启两个进程!如果想设置单向,可以添加参数「duplex=False」,双向即可发送也可接受,但是只允许前面的端口用于接收,后面的端口用于发送。管道对象发送和接收信息的函数依次为send()和recv()。使用代码示例如下

import multiprocessing as mpdef p_1(p):p.send("你好啊!")print("P1-收到信息:", p.recv())def p_2(p):print("P2-收到信息:", p.recv())
p.send("你也好啊!")if __name__ == '__main__':pipe = mp.Pipe()p1 = mp.Process(target=p_1, args=(pipe[0],))p2 = mp.Process(target=p_2, args=(pipe[1],))p1.start()p2.start()p1.join()p2.join()

运行结果如下

P2-收到信息: 你好啊!
P1-收到信息: 你也好啊!

关于多进程加锁的,可以参见前面多进程加锁部分内容,这里就不重复讲解了,只是导包换成了multiprocessing,比如threading.Lock换成了multiprocessing.Lock

Python小白到老司机,快跟我上车!基础篇(二十)相关推荐

  1. python进阶记录之基础篇二十六_Python进阶记录之基础篇(十六)

    回顾 在Python进阶记录之基础篇(十五)中,我们介绍了面向对象的基本概念以及Python中类和对象的基础知识,需要重点掌握类的创建和对象的使用.今天我们继续讲一下Python中面向对象的相关知识点 ...

  2. Python小白到老司机,快跟我上车!基础篇(十八)

    线程与进程的相关概念 关于线程和进程的话题,大部分的书只是微微提下,读者学完云里雾里,不知所以.本章会对Python中的多线程和多进程进行详解.大部分都是概念性的东西,不要去死记硬背,学完了解有个大概 ...

  3. Python小白到老司机,快跟我上车!基础篇(十七)

    类与对象 1.面相对象的理解 考虑到部分读者可能没有接触过面向对象编程,所以先介绍下面向对象的一些特征,形成一个面向对象概念的基本认知,有助于后面具体的学习Python的面向对象编程. ① 对象引入 ...

  4. 简洁的留言代码_这几段代码,测测你是 Python 菜鸟还是老司机

    这段话被称作"Python 之禅"(The Zen of Python),它列举了一些 Python 所推崇的理念,比如: 优美胜于丑陋 明确胜于隐晦 简单胜于复杂 - 可读性很重 ...

  5. Python 小白从零开始 PyQt5 项目实战(8)汇总篇(完整例程)

    本系列面向 Python 小白,从零开始实战解说应用 QtDesigner 进行 PyQt5 的项目实战.不跳过一个细节,不漏掉一行代码,不省略一个例图. 本系列从软件安装.环境配置开始,介绍了基本应 ...

  6. 科学计算机后盖换电池,图吧小白教程 篇二十二:手把手教你给手机换电池(拆机)...

    图吧小白教程 篇二十二:手把手教你给手机换电池(拆机) 2019-11-16 14:06:58 4点赞 18收藏 2评论 创作立场声明:手机换电池省钱可以自己动手从工钱上省,买电池最好还是不要省钱买杂 ...

  7. 计算机网络教程网线制作,图吧小白教程 篇二十六:手把手教你自制网线(夹网线水晶头)...

    图吧小白教程 篇二十六:手把手教你自制网线(夹网线水晶头) 2019-11-19 23:07:38 31点赞 309收藏 27评论 创作立场声明:咕咕咕 教程最后还是出了,不过咱现在用啥还是直接网购号 ...

  8. Python遥感图像处理应用篇(二十二):Python+GDAL 批量等距离裁剪影像-续

    之前写过一篇按照指定行列号数量来进行影像等距离裁剪的博客,链接如下: Python遥感图像处理应用篇(二十二):Python+GDAL 批量等距离裁剪影像_空中旋转篮球的博客-CSDN博客_pytho ...

  9. 从技术小白到老司机,这20本书帮你“快进”20年

    导读:文艺复兴以来,源远流长的科学精神和逐步形成的学术规范,使西方国家在自然科学的各个领域取得了垄断性的优势:也正是这样的优势,使美国在信息技术发展的六十多年间名家辈出.独领风骚. 近年,在全球信息化 ...

最新文章

  1. 文件管理器_苹果超强文件管理器,秒变安卓?
  2. Linux(RHEL7.0)下安装nginx-1.10.2
  3. 理解 Delphi 的类(四) - 初识类的事件
  4. Eclipse非常有用的快捷键
  5. oracle to_char FM099999
  6. jquery瀑布流布局和鼠标滚动加载
  7. java css是什么_Java 之 CSS
  8. oracle学习--循环语句
  9. oracle 查看锁表情况及数据库连接情况
  10. 项目是如何完成的(一)
  11. 通过CuteFTP用VBScript使用SFTP,实现Win与Linux的文件传输
  12. 2011最新版:移动设备管理与OMA DM协议 V7
  13. html静态资源加载404,spring security访问静态资源文件出现404
  14. matlab 神经网络工具箱 nntraintool 详解
  15. EChat(简易聊天项目)三、聊天界面UI实现
  16. 符合OpenDRIVE规范的xodr文件格式解读(1) ——road部分
  17. 如何基于DEM高程数据提取生成等高线的教程
  18. Android各版本分布
  19. UrlRewritingNet实现ASP.net 2.0中URL重写(映射)
  20. 计算机主板上一般带有高速缓冲存储器cache,它是与什么之间的缓存,计算机微机原理与应用(一)...

热门文章

  1. mysql cbo优化器_Oracle的优化器(Optimizer) (CBO优化) 分享
  2. 网工必知—什么是堡垒机?-CCIE
  3. 微信小程序——小程序开发经验总结(持续更新)
  4. 夜空笼罩着大地...
  5. 【原创】ARM LINUX 外部RTC实时时钟驱动移植(RX8025)
  6. 教师资格证网站服务器,教师资格证网上报名上传照片IE设置
  7. [附源码]Java计算机毕业设计SSM房屋中介管理信息系统
  8. Android行业薪资现状,月薪2万属于低收入!
  9. Paraview等值线标识
  10. 注册表REG文件编写实例(创建、删除、添加、更改键值)