python mutilprocessing多进程编程
`为了更好的理解本文内容,请务必先了解Synchronization、Asynchronization、Concurrent、Mutex等基本概念
multiprocessing是一个类似于Threading模块的由API产生进程的包,关于Threading模块可以参考我的博客文章。multiprocessing能够 提供本地和远程两种并发模式,通过使用子进程而不是线程有效地避开了GIL。因此,multiprocessing允许程序员充分利用机器上的多个处理器,且该包支持在Unix系统和Windows系统上运行。
mutilprocessing还引入了在Threading模块中没有相类似的API。比如Pool对象,Pool对象提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。使用方法可以看看下面的例子:
from multiprocessing import Pooldef f(x):return x * xif __name__ == '__main__':with Pool(5) as p:print(p.map(f, [1, 2, 3, 4, 5, 6, 7]))# [1, 4, 9, 16, 25, 36, 49]
Process类
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={},daemon=None)
group
必须为None
,设置该参数仅仅是为了与Threading模块保持一致
target
是run()
方法调用的可调用对象
name
是指进程名
daemon
指示是否设置为守护进程
run()
表示进程活动的方法,可在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别使用args和kwargs参数中的顺序和关键字参数。
start()
启动进程的活动,每个进程对象最多只能调用一次,在一个单独的进程中调用对象的run()方法
join([timeout])
如果可选参数timeout为None(缺省值),则该方法将阻塞,直到调用其join()方法的进程终止。如果timeout是一个正数,它最多会阻塞timeout秒。请注意,如果方法的进程终止或方法超时,则该方法返回None。检查进程的exitcode以确定它是否终止。
name
进程名
is_alive()
指示进程是否还活着
daemon
daemon flag, a Boolean value, 必须在进程start之前设置
pid
process ID
exitcode
负值-N表示孩子被信号N终止,默认为None,表示进程未被终止
authkey
The process’s authentication key (a byte string)
sentinel
系统对象的数字句柄,当进程结束时将变为“ready”
terminate()
终止进程,但注意子进程不会被终止,只是会成孤儿
请注意,start(),join(),is_alive(),terminate()和exitcode方法只应由创建过程对象的进程调用。
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
在multiprocessing中,通过创建Process
对象然后调用其start()
方法来生成进程,其使用方法和threading.Thread
一样。我们看下面的例子:
from multiprocessing import Processdef f(name):print('hello', name)if __name__ == '__main__': # 这句话是必要的,不可去掉p = Process(target=f, args=('bob',))p.start()p.join()
我们可以通过进程号来区分不同的进程:
from multiprocessing import Process
import osdef info(title):print(title)print('module name:', __name__)print('parent process:', os.getppid())print('process id:', os.getpid(), '\n')def f(name):info('function f')print('hello', name)if __name__ == '__main__':info('main line')p = Process(target=f, args=('bob',)) # 创建新进程p.start() # 启动进程p.join()
进程启动
根据平台的不同,multiprocessing支持三种启动进程的方法。这些启动方法是:
spawn
spawn
调用改方法,父进程会启动一个新的python进程,子进程只会继承运行进程对象
run()
方法所需的那些资源。特别地,子进程不会继承父进程中不必要的文件描述符和句柄。与使用fork
或forkserver
相比,使用此方法启动进程相当慢。Available on Unix and Windows. The default on Windows.
fork
父进程使用
os.fork()
来fork Python解释器。子进程在开始时实际上与父进程相同,父进程的所有资源都由子进程继承。请注意,安全创建多线程进程尚存在一定的问题。Available on Unix only. The default on Unix.
forkserver
当程序启动并选择
forkserver
start方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它fork一个新进程。 fork服务器进程是单线程的,因此使用os.fork()是安全的。没有不必要的资源被继承。Available on Unix platforms which support passing file descriptors over Unix pipes.
要选择以上某一种start方法,请在主模块的
if __name__ == '__ main__'
子句中使用mp.set_start_method()
。并且
mp.set_start_method()
在一个程序中仅仅能使用一次。import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':mp.set_start_method('spawn')q = mp.Queue()p = mp.Process(target=foo, args=(q,))p.start()print(q.get())p.join()
或者,您可以使用
get_context()
来获取上下文对象。上下文对象与多处理模块具有相同的API,并允许在同一程序中使用多个启动方法。import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':ctx = mp.get_context('spawn')q = ctx.Queue()p = ctx.Process(target=foo, args=(q,))p.start()print(q.get())p.join()
注意,与一个context相关的对象可能与不同context的进程不兼容。特别是,使用fork context创建的锁不能传递给使用spawn或forkserver start方法启动的进程。
进程通信
当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免必须使用任何synchronization primitives(如锁)。对于传递消息,可以使用Pipe(用于两个进程之间的连接)或Queue(允许多个生产者和消费者)。
Queues
class multiprocessing.Queue
([maxsize])
Queue实现queue.Queue
的所有方法,但task_done()
和join()
除外。Queue是进程、线程安全的模型
from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q,))p.start()print(q.get()) # prints "[42, None, 'hello']"p.join()
Pipes
Class multiprocessing.Pipe([duplex])
返回一对(conn1, conn2) of Connection 对象代表pipe的两端。如果duplex为True(默认值),则管道是双向的;如果duplex为False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。Pipe()`函数返回一个由Pipe连接的连接对象,默认情况下是全双工双向通信(duplex)。例如:
from multiprocessing import Process, Pipedef f(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=f, args=(child_conn,))p.start()print(parent_conn.recv()) # prints "[42, None, 'hello']"p.join()
Pipe()返回的两个连接对象代表管道的两端,每个连接对象都有send()和recv()方法。需要注意的是,管道中的数据可能会不一致或被破坏,如当两个进程(或线程)尝试同时读取或写入管道的同一端。当然,同时使用管道的不同端部的过程不存在损坏的风险。
进程共享状态
在进行并发编程时,通常最好避免使用共享状态,但是,如果你确实需要使用某些共享数据,那么multiprocessing提供了以下两种方法:
Shared Memory
可以使用Value或Array将数据存储在共享内存的map(映射)中。例如,以下代码:
from multiprocessing import Process, Value, Arraydef f(n, a):n.value = 3.1415927for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0)arr = Array('i', range(10))p = Process(target=f, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])# 3.1415927# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num和arr时使用的’d’和’i’参数是array
module使用的类型的类型代码:'d’表示双精度浮点数,'i’表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes
模块,该模块支持创建从共享内存分配的任意ctypes对象。但还是那句话,在进行并发编程时,通常最好避免使用共享状态。
Server Process
Manager()
返回的Manager对象控制一个服务器进程(server process),该进程保存Python对象并允许其他进程使用代理操作它们。Manager对象支持的对象包括list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
以及 Array
。Managers提供了一种创建可在不同进程之间共享的数据的方法,包括在不同计算机上运行的进程之间通过网络共享。管理器对象控制管理共享对象的服务器进程。其他进程可以使用代理访问共享对象。
from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print(d)print(l)#{0.25: None, 1: '1', '2': 2}
#[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Proxy
代理是一个对象,它指的是(可能)在不同的进程中存在的共享对象。共享对象被认为是代理的指示对象。多个代理对象可能具有相同的指示对象。代理对象具有调用其引用对象的相应方法的方法。代理对象的一个重要特性是它们是pickable的,因此它们可以在进程之间传递。
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l) # l即是一个代理对象
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Connection
&esmp;connection对象允许发送和接收可序列化对象或字符串。它们可以被认为是面向消息的连接套接字,我们再上面介绍Pipe的时候所实例化的对象就是connection对象。
send
(obj)将对象发送到连接的另一端,应使用
recv()
读取,且该对象必须是pickable的,>32 MB的对象可能会引发ValueError异常。recv
()返回从连接另一端发送的对象。阻塞直到接收到东西。如果没有剩余要接收和另一端被关闭,则引发EOFError。
fileno
()返回conn所使用的文件描述符或句柄
close
()关闭连接
poll
([timeout])返回是否有可供读取的数据,如果未指定超时,则会立即返回;如果timeout是一个数字,则指定阻止的最长时间(以秒为单位);如果timeout为None,则使用无限超时。
send_bytes
(buffer[, offset[, size]])发送字节数据
recv_bytes
([maxlength])接受字节数据
recv_bytes_into
(buffer[, offset])读取从连接另一端发送的字节数据的完整消息到buffer,并返回消息中的字节数。
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
summary
Server process Manager比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但它比使用共享内存慢。
Synchronization
同步原语和Threading模块几乎一致,具体请参考Python Threading 多线程编程
Lock
from multiprocessing import Process, Lockdef f(l, i):"""保证同一时间只有一个标准输出流"""l.acquire()try:print('hello world', i)finally:l.release()if __name__ == '__main__':lock = Lock()for num in range(10):Process(target=f, args=(lock, num)).start()# output
hello world 1
hello world 0
hello world 2
hello world 4
hello world 3
hello world 6
hello world 9
hello world 5
hello world 8
hello world 7
Pool类
Pool类用于创建进程池
主要方法有,具体例子见代码,并请注意,pool对象的方法只能由创建它的进程使用:
pool.map()
pool.imap()
Equivalent ofmap()
– can be MUCH slower thanPool.map()
.pool.starmap()
Likemap()
method but the elements of theiterable
are expected to be iterables as well and will be unpacked as arguments.pool.starmap_async
Asynchronous version ofstarmap()
methodpool.map_async
Asynchronous version ofmap()
method.pool.imap_unordered()
pool.apply()
pool.apply_async()
Asynchronous version ofapply()
method.
from multiprocessing import Pool, TimeoutError
import time
import osdef f(x):return x*xif __name__ == '__main__':# start 4 worker processeswith Pool(processes=4) as pool:# print "[0, 1, 4,..., 81]"print(pool.map(f, range(10)))# print same numbers in arbitrary orderfor i in pool.imap_unordered(f, range(10)):print(i, end='\t')print()# evaluate "f(20)" asynchronouslyres = pool.apply_async(f, (20,)) # runs in *only* one processprint(res.get(timeout=1)) # prints "400"# evaluate "os.getpid()" asynchronouslyres = pool.apply_async(os.getpid, ()) # runs in *only* one processprint(res.get(timeout=1)) # prints the PID of that process# launching multiple evaluations asynchronously *may* use more processesmultiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]print([res.get(timeout=1) for res in multiple_results])# make a single worker sleep for 10 secsres = pool.apply_async(time.sleep, (10,))try:print(res.get(timeout=1))except TimeoutError:print("We lacked patience and got a multiprocessing.TimeoutError")print("For the moment, the pool remains available for more work")# exiting the 'with'-block has stopped the poolprint("Now the pool is closed and no longer available")# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 0 1 4 9 16 25 36 49 64 81
# 400
# 2696
# [2696, 2696, 2696, 2696]
# We lacked patience and got a multiprocessing.TimeoutError
# For the moment, the pool remains available for more work
# Now the pool is closed and no longer available
Miscellaneous
multiprocessing.active_children()
返回当前进程的所有活子进 程的列表
multiprocessing.cpu_count()
返回系统中的CPU数量,此数字不等于当前进程可以使用的CPU数量。可以使用
len(os.sched_getaffinity(0))
获得可用CPU的数量multiprocessing.current_process()
返回与当前进程对应的Process对象
multiprocessing.freeze_support()
为程序打包成exe可执行文件提供支持,在Windows以外的任何操作系统上调用时,调用
freeze_support()
无效。此外,如果模块由Windows上的Python解释器正常运行(程序尚未冻结),则freeze_support()
无效from multiprocessing import Process, freeze_supportdef f():print('hello world!')if __name__ == '__main__':freeze_support()Process(target=f).start()
multiprocessing.get_all_start_methods()
返回支持的start方法列表,第一个是默认方法。可能的启动方法是’fork’,‘spawn’和’forkserver’。在Windows上只有“spawn”可用。在Unix上’fork’和’spawn’总是受支持,'fork’是默认值。
multiprocessing.get_context(method=None)
返回与multiprocessing模块具有相同属性的上下文对象,具体用法前面已经有过例子
multiprocessing.get_start_method(allow_none=False)
返回用于启动进程的start方法的名称,返回值可以是’fork’,‘spawn’,'forkserver’或None。 'fork’是Unix上的默认值,而’spawn’是Windows上的默认值。
multiprocessing.set_executable()
设置启动子进程时要使用的Python解释器的路径
multiprocessing.set_start_method(method)
设置用于启动子进程的方法。方法可以是’fork’,‘spawn’或’forkserver’。请注意,改法最多调用一次,并且应该写在主模块的if name ==’__ main__'子句中。
python mutilprocessing多进程编程相关推荐
- 并发编程之多进程编程(python版)
目录 1 python多进程编程概述 2 需求和方案 背景: 需求: 解决思路: 需要解决的问题和方案: 3 完整代码 1 python多进程编程概述 python中的多线程无法利用多核优势,如果想要 ...
- python多进程编程实例_Python多进程编程multiprocessing代码实例
下面记录一下多进程编程的别一种方式,即使用multiprocessing编程 import multiprocessing import time def get_html(n): time.slee ...
- Python 3 并发编程多进程之进程同步(锁)
Python 3 并发编程多进程之进程同步(锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,竞争带来的结果就是错乱,如何控制,就是加锁处理. 1. ...
- python多进程间通信_Python 多进程编程之 进程间的通信(Queue)
Python 多进程编程之 进程间的通信(Queue) 1,进程间通信 Process有时是需要通信的,操作系统提供了很多机制来实现进程之间的通信,而Queue就是其中的一个方法 ----这是操作系统 ...
- Python可以这样学(第三季:多线程与多进程编程)-董付国-专题视频课程
Python可以这样学(第三季:多线程与多进程编程)-7527人已学习 课程介绍 董付国老师系列教材<Python程序设计(第2版)>(ISBN:9787302436515 ...
- Python多线程和多进程编程
原文地址:https://tracholar.github.io/wiki/python/python-multiprocessing-tutorial.html 简介 早已进入多核时代的计算机,怎能 ...
- python多进程编程实例_[python] Python多进程编程技术实例分析
这篇文章主要介绍了Python多进程编程技术,包括了线程.队列.同步等概念及相关的技巧总结,需要的朋友可以参考下 本文以实例形式分析了Python多进程编程技术,有助于进一步Python程序设计技巧. ...
- python编程图文_深入Python多进程编程基础——图文版
多进程编程知识是Python程序员进阶高级的必备知识点,我们平时习惯了使用multiprocessing库来操纵多进程,但是并不知道它的具体实现原理.下面我对多进程的常用知识点都简单列了一遍,使用原生 ...
- Python多进程编程及多进程间的通信,数据传输
多进程编程及进程间的通信多进程的优缺点进程(process)三态五态(三态的基础上增加了新建态和终止态)进程优先级进程特征孤儿进程僵尸进程要求理解多进程编程进程相关的函数多进程模块Process()创 ...
最新文章
- 在云中进行灾难恢复的五种有效方式
- springboot ftp 笔记
- Spring MVC:高级会话
- vue class与style绑定
- Spring AOP解析
- 线程与并发基础-青铜
- 小贾漫谈——Java反射
- 开发运维日常坑 总结 51-100
- 鼎捷软件易飞9.0ERP操作手册
- 工资纳税系统c语言编程加注释,基于C语言的个人所得税计税系统
- Android应用测试篇
- python阿拉伯数字转中文_阿拉伯数字转化为中文数字
- 百度Sugar数据可视化领域优势地位因何受到挑战?
- [转]音乐天堂 Music Heaven Vol.1 ~ Vol.36 的目录
- MongoDB集群和安全
- java kpi_JAVA内存调优的KPI
- HICO/HICO-Det 数据集介绍
- csharp基础练习题:符号计数【难度:1级】--景越C#经典编程题库,不同难度C#练习题,适合自学C#的新手进阶训练
- omnet++仿真软件会从运行模拟开始
- 产研团队任务管理工具:盘点国内外9款知名任务管理系统软件
热门文章
- kotlin学习之lambda(十)
- Netty之Channel的继承关系
- [leetcode] 103.二叉树的锯齿形遍历
- php中的rtrim_php中ltrim()、rtrim()与trim()删除字符空格实例
- python累加求和_python中的变量和数据类型(一)
- 网络的分层思想和数据封装与解封装概论
- FatFs最新版本获取方法
- mysql数据通讯方式_c# 与 Mysql 的通讯方式总结
- mysql utf8 cmd,MySQL Windows下cmd显示中文乱码
- 易语言 服务器抓包,易语言抓包获得地址实现TP路由器登陆的代码