Python并发之多进程multiprocessing(2)
1, 多进程 vs 多线程
Python中的常见的并发模型分为两种:
- 多线程threading并发,多用于IO密集型计算
- 多进程multiprocessing并发,多用于CPU密集型计算
(1)IO密集 vs CPU密集
IO密集:
I/O bound 指的是系统的CPU效能相对硬盘/内存的效能要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写,此时 CPU Loading 不高。
IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
CPU密集:
CPU bound 指的是系统的 硬盘/内存 效能 相对 CPU 的效能 要好很多,此时,系统运作,大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O (硬盘/内存),I/O在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。
CPU bound密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。
(2)多进程 vs 多线程
首先,要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。
如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。
如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。
多进程稳定性好,但是资源代价大
多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。
多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork
调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。
多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。
在Windows下,多线程的效率比多进程要高,所以微软的IIS服务器默认采用多线程模式。由于多线程存在稳定性的问题,IIS的稳定性就不如Apache。为了缓解这个问题,IIS和Apache现在又有多进程+多线程的混合模式,真是把问题越搞越复杂。
多线程CPU切换频率高
操作系统在切换进程或者线程时,它需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。
Python中多线程的伪多线程
由于GIL,是伪多线程。如果使用多线程,所有的计算只会在一个CPU核上,无法真正利用CPU多核。
想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。
2,多进程基本用法
Python中提供了multiprocessing这个包实现多进程。multiprocessing支持子进程、进程间的同步与通信,提供了Process、Queue、Pipe、Lock等组件。
Python的multiprocessing库通过以下几步创建进程:
- 创建进程对象
- 调用
start()
方法,开启进程的活动 - 调用
join()
方法,在进程结束之前一直等待
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():启动一个子进程。准备就绪,等待CPU调度
run():不启动子进程,直接执行函数。如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
authkey
daemon:和线程的setDeamon功能一样
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
和多线程一样,多线程multiprocessing模块也有两种基本办法创建子进程:
- 用法一:Process类来生成进程实例
from multiprocessing import Process
import os
def func(i):print("pid: {}, execute: {} * {} = {}".format(os.getpid(), i, i, i*i))if __name__ == "__main__":data = [1, 2, 3, 4, 5]process_list = []for d in data:p = Process(target=func, args=(d,))process_list.append(p)for process in process_list:process.start()for process in process_list:process.join()
- 用法二:继承Process类,自定义进程子类,实现run方法
实现一个自定义的进程子类,需要以下三步:
1>定义 Process
的子类
2>覆盖 __init__(self [,args])
方法来添加额外的参数
3>覆盖 run(self, [.args])
方法来实现 Process
启动的时候执行的任务
from multiprocessing import Process
import os, timeclass MyProcess(Process):def __init__(self, target=None, args=(), kwargs={}):super(MyProcess, self).__init__()self.target = targetself.args = tuple(args)self.kwargs = dict(kwargs)def run(self):if self.target:print("func {} is running at {}".format(self.target.__name__, time.ctime()))return self.target(*self.args, **self.kwargs)def func_test(i):name = multiprocessing.current_process().nameprint("{} pid: {}, execute: {} * {} = {}".format(name, os.getpid(), i, i, i * i))if __name__ == "__main__":data = [1, 2, 3, 4, 5]process_list = []for d in data:p = MyProcess(target=func, args=(d,))process_list.append(p)for process in process_list:process.start()for process in process_list:process.join()
3,多进程数据同步原语
进程的同步原语和线程的库很类似:
- Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法,
acquire()
和release()
,来控制共享数据的读写权限。 - Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。
Event
对象有两个方法,set()
和clear()
,来管理自己内部的变量。 - Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法:
wait()
用来等待进程,notify_all()
用来通知所有等待此条件的进程。 - Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
- Rlock: 递归锁对象。其用途和方法同
Threading
模块一样。 - Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。(Python 3.3以后版本支持)
Lock,Rlock,Event,Condition,Semaphore几个进程同步原语的用法和多线程基本完全一致,只需要将threading.Thread对象换成multiprocessing.Process对象即可。
请参考前一篇博客:https://blog.csdn.net/biheyu828/article/details/83019392
示例代码:
使用Barrier栅栏控制多进程并发执行
import multiprocessing
import time
from multiprocessing import Process
from multiprocessing import Barrier, Lockdef run_with_barrier(barrier):proc_name = multiprocessing.current_process().namebarrier.wait() ##当两个进程p都调用 wait() 方法的时候,它们会一起继续执行time.sleep(3)print("process {} ----> {}".format(proc_name, time.time()))def run_without_barrier():proc_name = multiprocessing.current_process().nametime.sleep(3)print("process {} ----> {}".format(proc_name, time.time()))if __name__ == "__main__":barrier = Barrier(2) lock = Lock()process_list = []pro_1 = Process(name="process_1_barrier",target=run_with_barrier, args=(barrier,))pro_2 = Process(name="process_2_barrier",target=run_with_barrier, args=(barrier,))pro_3 = Process(name="process_3_no_barrier",target=run_without_barrier)pro_4 = Process(name="process_4_no_barrier",target=run_without_barrier)process_list.append(pro_1)process_list.append(pro_2)process_list.append(pro_3)process_list.append(pro_4)for pro in process_list:pro.start()for pro in process_list:pro.join()
运行结果:
process process_3_no_barrier ----> 1540048781.565112
process process_1_barrier ----> 1540048781.565113
process process_2_barrier ----> 1540048781.565102
process process_4_no_barrier ----> 1540048781.5670989
从运行结果可以看出pro_1和pro_2到达了barrier几乎同时运行,但是pro_3和pro_4进程时间差别较大。
4,多进程交换数据(queue/ pipe)
不同进程之间内存是不共享的。在多进程中直接使用线程类似的方式共享数据,会出现报错,全局变量并不能在不同进程间共享。
import multiprocessing
from multiprocessing import Process
import random
import timeitems = []
lock = multiprocessing.Lock()class Consumer(Process):def __init__(self):super(Consumer, self).__init__()def run(self):global itemsglobal lockwith lock:print("items in consumer is: {}".format(items))data = items.pop()print("consume data: {} at: {} ".format(data, time.ctime()))class Producer(Process):def __init__(self):super(Producer, self).__init__()def run(self):global lockglobal itemswith lock:data = random.randrange(1, 1000)items.append(data)print("items in producer is: {}".format(items))print("produce data: {} at: {} ".format(data, time.ctime()))if __name__ == "__main__":producer_pro = Producer()consumer_pro = Consumer()producer_pro.start()consumer_pro.start()producer_pro.join()consumer_pro.join()
输出如下:
Process Consumer-2:
items in producer is: [214]
produce data: 214 at: Wed Oct 17 22:26:54 2018
items in consumer is: [] ##全局变量items并没有共享给consumer
Traceback (most recent call last):File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrapdata = items.pop()
IndexError: pop from empty list
Multiprocessing库有两个Communication Channel可以交换对象:
- 队列(queue):
Queue
返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过pickable
模块序列化对象)都可以通过它进行交换。 - 管道(pipe):Pipe 方法返回(conn1, conn2)代表一个管道的两端。pipe方法有个duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2负责发送消息。
Queue和pipe的区别:
Queue:主要用于多对多数据交换,例如多生产者->多消费者模型
pipe:主要用于单对单数据交换,例如单生产者->单消费者模型。在数据交换效率上,pipe比queue要高。同样的数据量使用queue大约是pipe的3倍左右。
(1)多进程Queue用法
内置三种类型的队列:
Queue
:FIFO(先进先出);LifoQueue
:LIFO(后进先出);PriorityQueue
:优先级最小的先出;
构造函数一样,都是只有一个maxsize=0,用于设置队列的容量,
如果设置的maxsize小于1,则表示队列的长度无限长。
两个异常:
- Queue.Empty:当调用非堵塞的get()获取空队列元素时会引发;
- Queue.Full:当调用非堵塞的put()满队列里添加元素时会引发;
相关函数
- qsize():返回队列的近似大小,注意:qsize()> 0不保证随后的get()不会
阻塞也不保证qsize() < maxsize后的put()不会堵塞; - empty():判断队列是否为空,返回布尔值,如果返回True,不保证后续
调用put()不会阻塞,同理,返回False也不保证get()调用不会被阻塞; - full():判断队列是否满,返回布尔值如果返回True,不保证后续
调用get()不会阻塞,同理,返回False也不保证put()调用不会被阻塞; - put(item, block=True, timeout=None):往队列中放入元素,如果block
为True且timeout参数为None(默认),为堵塞型put(),如果timeout是
正数,会堵塞timeout时间并引发Queue.Full异常,如果block为False则
为非堵塞put() - put_nowait(item):等价于put(item, False),非堵塞put()
- get(block=True, timeout=None):移除一个队列元素,并返回该元素,
如果block为True表示堵塞函数,block = False为非堵塞函数,如果设置
了timeout,堵塞时最多堵塞超过多少秒,如果这段时间内没有可用的
项,会引发Queue.Empty异常,如果为非堵塞状态,有数据可用返回数据
无数据立即抛出Queue.Empty异常; - get_nowait():等价于get(False),非堵塞get()
- task_done():完成一项工作后,调用该方法向队列发送一个完成信号,任务-1;
- join():等队列为空,再执行别的操作;
代码示例:
import random, time
import multiprocessing
from multiprocessing import Processclass Consumer(Process):def __init__(self, queue):super(Consumer, self).__init__()self._queue = queuedef run(self):data = self._queue.get()print("consume data: {} at: {} ".format(data, time.ctime()))class Producer(Process):def __init__(self, queue):super(Producer, self).__init__()self._queue = queuedef run(self):data = random.randrange(11111, 999999)self._queue.put(data)print("produce data: {} at: {} ".format(data, time.ctime()))if __name__ == "__main__":queue = multiprocessing.Queue()producer_pro = Producer(queue)consumer_pro = Consumer(queue)producer_pro.start()consumer_pro.start()producer_pro.join()consumer_pro.join()
(2)多进程Pipe用法
Pipe对象常用函数
Pipe() | Pipe 方法返回(conn1, conn2)代表一个管道的两端。PIPE方法有个deplex参数,如果deplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2负责发送消息。 |
send() | 向connection发数据 |
recv() | 从connection接收数据 |
close() | 关闭connection |
代码示例:
from multiprocessing import Pipe, Processdef consumer(input_pipe):while True:try:data = input_pipe.recv()print("consume data: {}".format(data))except EOFError:breakdef producer(output_pipe, sequence_data):for data in sequence_data:output_pipe.send(data)print("produce data: {}".format(data))if __name__ == "__main__":(input_pipe, output_pipe) = Pipe(False)sequence_data = [1,2,3,4,5]consumer_pro = Process(target=consumer, args=(input_pipe,))producer_pro = Process(target=producer, args=(output_pipe,sequence_data))consumer_pro.start()producer_pro.start()consumer_pro.join()producer_pro.join()
5,进程池
进程池有两种实现方式:
- 使用multiprocessing自带的Pool类创建进程池
- 使用concurrent.futures模块中ProcessPoolExecutor类创建进程池
比较:
- 用futures的写法上更简洁一些,concurrent.futures的性能并没有更好,只是让编码变得更简单。考虑并发编程的时候,任何简化都是好事。从长远来看,concurrent.futures编写的代码更容易维护。
- 使用map时,future是逐个迭代提交,multiprocessing.Pool是批量提交jobs,因此对于大批量jobs的处理,multiprocessing.Pool效率会更高一些。对于需要长时间运行的作业,用future更佳,future提供了更多的功能(callback, check status, cancel)。
- concurrent.futures.ProcessPoolExecutor是对multiprocessing的封装,在运行时需导入__main__,不能直接在交互窗口工作。
(1)使用multiprocessing自带的Pool类创建进程池
多进程库提供了 Pool
类来实现简单的多进程任务。 Pool
类有以下方法:
apply() |
该函数用于传递不定参数,主进程会被阻塞直到函数执行结束,实际上这也就说所谓的同步执行。 同步执行,按照加入进程池的顺序执行事件,每次执行完一个再执行另一个,可以获取事件返回值 |
apply_async() |
与apply用法一样,但它是非阻塞且支持结果返回进行回调;实际上也就是异步执行。 异步执行,同时启动进程池中多个进程执行事件,apply_async()可以获取事件返回进度(ApplyResult)对象。任务执行完成以后,使用ApplyResult对象的get()方法获取返回值。 apply_async方式提供了一写获取进程函数状态的函数:ready()、successful()、get() |
map() |
与内置map函数用法基本一致,它融合了map函数和apply_async()函数的功能;它会使进程阻塞直到返回结果。 注意:虽然第二个参数是一个迭代器,但实际应用中,必须在整个队列就绪后,程序才会运行子进程。 |
map_async() |
这是 map_async方式也提供了一写获取进程函数状态的函数:ready()、successful()、get() |
close() | 关闭进程池,阻止更多的任务提交到进程池Pool,待任务完成后,工作进程会退出 |
terminate() | 结束工作进程,不再处理未完成的任务 |
join() | 等待工作线程的退出,必须在close()或terminate()之后使用,因被终止的进程需要被父进程调用wait(join等价于wait),否则进程会成为僵尸进程。 |
注意:
- 使用Pool创建进程池对象,同时进程池中进程已经启动
- 向进程池对象中添加事件,事件排队执行
- 如果主进程退出,则进程池中所有进程都退出
用法一:使用apply()添加进程
示例代码:
import time
import multiprocessing
from multiprocessing import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)if __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)result_list = []for data in data_list:result = pool.apply(data_ready, args=(data,))print(result)result_list.append(result)pool.close() #关闭进程池,禁止添加新任务pool.join() #等待子进程全部结束之后, 再继续主进程print("main process finish")
运行结果:
ForkPoolWorker-1 execute 1 at Sun Oct 21 21:04:31 2018
value_1
ForkPoolWorker-2 execute 2 at Sun Oct 21 21:04:32 2018
value_2
ForkPoolWorker-3 execute 3 at Sun Oct 21 21:04:33 2018
value_3
ForkPoolWorker-1 execute 4 at Sun Oct 21 21:04:34 2018
value_4
ForkPoolWorker-2 execute 5 at Sun Oct 21 21:04:35 2018
value_5
main process finish
从执行结果可以看出,进程池中共有3个进程在串行逐个执行。每执行完一个进程,且返回结果后才执行下一个进程,相当于单线程。
用法二:使用apply_sync()添加进程
apply_sync()和 apply()方式方法基本一致,不同的是apply_sync()是异步执行。并且返回值不一样,apply_sync()不是直接返回执行结果,而是一个为进度对象(ApplyResult)对象
示例代码:
import time
import multiprocessing
from multiprocessing import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)def callback():print("this is call back")if __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)result_list = []for data in data_list:result = pool.apply_async(data_ready, args=(data,)) ##返回ApplyResult对象print(result)result_list.append(result)for res in result_list:print(res.get())pool.close() #关闭进程池,禁止添加新任务pool.join() #等待子进程全部结束之后, 再继续主进程print("main process finish")
执行结果:
<multiprocessing.pool.ApplyResult object at 0x106662828>
<multiprocessing.pool.ApplyResult object at 0x106662940>
<multiprocessing.pool.ApplyResult object at 0x106662a58>
<multiprocessing.pool.ApplyResult object at 0x106662b38>
<multiprocessing.pool.ApplyResult object at 0x106662c18>
ForkPoolWorker-2 execute 2 at Mon Oct 22 10:54:35 2018
ForkPoolWorker-1 execute 1 at Mon Oct 22 10:54:35 2018
ForkPoolWorker-3 execute 3 at Mon Oct 22 10:54:35 2018
value_1
value_2
value_3
ForkPoolWorker-2 execute 5 at Mon Oct 22 10:54:36 2018
ForkPoolWorker-1 execute 4 at Mon Oct 22 10:54:36 2018
value_4
value_5
main process finish
用法3: 使用map()启动进程
map(self, func, iterable, chunksize=None)接受一个可迭代对象作为参数,并且返回全部子进程的执行结果列表
示例代码:
import time
import multiprocessing
from multiprocessing import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)if __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)result = pool.map(data_ready, data_list)print(result)
运行结果:
ForkPoolWorker-1 execute 1 at Mon Oct 22 11:43:12 2018
ForkPoolWorker-2 execute 2 at Mon Oct 22 11:43:12 2018
ForkPoolWorker-3 execute 3 at Mon Oct 22 11:43:12 2018
ForkPoolWorker-1 execute 4 at Mon Oct 22 11:43:13 2018
ForkPoolWorker-2 execute 5 at Mon Oct 22 11:43:13 2018
['value_1', 'value_2', 'value_3', 'value_4', 'value_5']
用法4:使用map_async()启动进程
与map()不同的是map_async()返回的是MapResult对象,使用该对象的get()方法可以获取执行结果。与apply_async类似,也可以使用callback函数返回子进程执行结果。当子进程执行结束之后自动调用回掉函数。
示例代码:
import time
import multiprocessing
from multiprocessing import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)if __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)result = pool.map_async(data_ready, data_list)print(result) ##返回MapResult对象print(result.get())
运行结果:
<multiprocessing.pool.MapResult object at 0x105e61710>
ForkPoolWorker-1 execute 1 at Mon Oct 22 11:46:17 2018
ForkPoolWorker-2 execute 2 at Mon Oct 22 11:46:17 2018
ForkPoolWorker-3 execute 3 at Mon Oct 22 11:46:17 2018
ForkPoolWorker-3 execute 5 at Mon Oct 22 11:46:18 2018
ForkPoolWorker-1 execute 4 at Mon Oct 22 11:46:18 2018
['value_1', 'value_2', 'value_3', 'value_4', 'value_5']
callbak返回子进程执行结果示例:
import time
import multiprocessing
from multiprocessing import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)def call_back(result):print("result is: {} finished at {}".format(result, time.ctime()))return resultif __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)pool.map_async(data_ready, data_list, callback=call_back) ##callback函数可以返回子进程的执行结果pool.close() #关闭进程池,禁止添加新任务pool.join() #等待子进程全部结束之后, 再继续主进程print("main process finish")
运行结果:
ForkPoolWorker-1 execute 1 at Mon Oct 22 11:57:57 2018
ForkPoolWorker-2 execute 2 at Mon Oct 22 11:57:57 2018
ForkPoolWorker-3 execute 3 at Mon Oct 22 11:57:57 2018
ForkPoolWorker-1 execute 4 at Mon Oct 22 11:57:58 2018
ForkPoolWorker-2 execute 5 at Mon Oct 22 11:57:58 2018
result is: ['value_1', 'value_2', 'value_3', 'value_4', 'value_5'] finished at Mon Oct 22 11:57:58 2018
main process finish
(2)使用concurrent.futures模块中ProcessPoolExecutor类创建进程池
使用方式和ThreadPoolExecutor一致。
示例代码:
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, waitdef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(multiprocessing.current_process().name, data, time.ctime()))return "value_"+str(data)if __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = ProcessPoolExecutor(max_workers=3)with pool as executor:task_list = [executor.submit(data_ready, data) for data in data_list]wait(task_list)for task in task_list:print(task.result()) ##获取任务执行结果print("main process finish")
运行结果:
Process-2 execute 2 at Mon Oct 22 12:09:33 2018
Process-3 execute 3 at Mon Oct 22 12:09:33 2018
Process-1 execute 1 at Mon Oct 22 12:09:33 2018
Process-2 execute 4 at Mon Oct 22 12:09:34 2018
Process-1 execute 5 at Mon Oct 22 12:09:34 2018
value_1
value_2
value_3
value_4
value_5
main process finish
6, multiprocessing块中的多线程dummy
multiprocessing.dummy类实现了多线程功能,用法和multiprocessing多进程类似,api 都是通用的。 可以很方便将代码在多线程和多进程之间切换。
multiprocessing.dummy除了多线程的基本功能外,也提供了线程池Pool功能。
线程池Pool的使用有四种方式:apply_async、apply、map_async、map。其中apply_async和map_async是异步的,也就是启动进程函数之后会继续执行后续的代码不用等待进程函数返回。apply_async和map_async方式提供了一写获取进程函数状态的函数:ready()、successful()、get()。
示例代码:
import time
import multiprocessing.dummy as dum
from multiprocessing.dummy import Pooldef data_ready(data):time.sleep(1)print("{} execute {} at {}".format(dum.current_process(), data, time.ctime()))return "value_"+str(data)def call_back(result):print("result is: {} finished at {}".format(result, time.ctime()))return resultif __name__ == "__main__":data_list = [1, 2, 3, 4, 5]pool = Pool(processes=3)pool.map_async(data_ready, data_list, callback=call_back) ##callback函数可以返回执行结果pool.close() #关闭线程池,禁止添加新任务pool.join() #等待线程全部结束之后, 再继续主进程print("main process finish")
运行结果:
<DummyProcess(Thread-3, started daemon 123145387253760)> execute 3 at Mon Oct 22 13:23:24 2018
<DummyProcess(Thread-1, started daemon 123145376743424)> execute 1 at Mon Oct 22 13:23:24 2018
<DummyProcess(Thread-2, started daemon 123145381998592)> execute 2 at Mon Oct 22 13:23:24 2018
<DummyProcess(Thread-1, started daemon 123145376743424)> execute 4 at Mon Oct 22 13:23:25 2018
<DummyProcess(Thread-3, started daemon 123145387253760)> execute 5 at Mon Oct 22 13:23:25 2018
result is: ['value_1', 'value_2', 'value_3', 'value_4', 'value_5'] finished at Mon Oct 22 13:23:25 2018
main process finish
7, 多进程数据共享
Python中多进程数据共享主要有两种方式:
- 内存共享
- 进程共享
用法一:内存共享
在多进程情况下,由于每个进程有自己独立的内存空间,怎样能实现内存共享呢?multiprocessing模块提供了Value, Array,这两个是函数,详细定义在sharedctypes.py里。ctypes是Python的一个外部函数库,它提供了和C语言兼任的数据类型,可以调用DLLs或者共享库的函数,能被用作在python中。
(1) Value
Value的初始化非常简单,直接类似Value('d', 0.0)即可,具体构造方法如下:
multiprocessing.Value(typecode_or_type, *args[,lock])
返回从共享内存中分配的一个ctypes 对象。其中typecode_or_type定义了返回的类型,它要么是一个ctypes类型,要么是一个代表ctypes类型的code。
*args是传递给ctypes的构造参数
比如整数1,可用Value('h',1)
对于共享整数或者单个字符,初始化比较简单,参照下图映射关系:
Type Code | C Type | Python Type |
'c' | char | character |
'b' | signed char | int |
'B' | unsigned char | int |
'u' | Py_UNICODE | unicode character |
'h' | signed short | int |
'H' | unsigned short | int |
'i' | signed int | int |
'I' | unsigned int | int |
'l' | signed long | int |
'L' | unsigned long | int |
'f' | float | float |
'd' | double | float |
如果共享的是字符串,则在上表是找不到映射关系的,就是没有对应的Type code可用。所以我们需要使用原始的ctype类型。
比如上面的Value('h',1)也可以用Value(c_short,1),字符串的话,可以用Value(c_char_p,"hello"),很好理解的。
它返回的是个对象,所以,它也有一些属性和方法:
value | 获取值 |
get_lock() | 获取锁对象 |
acquire() | 获取锁 |
release() | 释放锁 |
ctype类型对应关系如下:
ctypes type | C type | Python type |
c_bool |
_Bool | bool (1) |
char | char | 1-character string |
c_wchar | wchar_t | 1-character unicode string |
c_byte | char | int/long |
c_ubyte | unsigned char | int/long |
c_short | short | int/long |
c_ushort | unsigned short | int/long |
c_int | int | int/long |
c_uint | unsigned in | int/long |
c_long | long | int/long |
c_ulong | unsigned long | int/long |
c_longlong | __int64 or long long | int/long |
c_ulonglong | unsigned __int64 or unsigned long long | int/long |
c_float | float | float |
c_double | double | float |
c_longdouble | long double | float |
c_char_p | char * (NUL terminated) | string or None |
c_wchar_p |
wchar_t * (NUL terminated) | unicode or None |
c_void_p | void * | int/long or None |
(2)Array
它返回从共享内存分配的ctypes数组, 构造函数:
multiprocessing.Array(typecode_or_type, size_or_initializer, *,lock=True)
typecode_or_type确定返回数组的元素的类型:它是一个ctypes类型或一个字符类型代码类型的数组模块使用的类型。
size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。
如果关键字参数中有lock的话,lock为True,则会创建一个新的锁对象,以同步对该值的访问。如果lock是Lock或RLock对象,那么它将用于同步对该值的访问。如果lock是False,那么对返回的对象的访问不会被锁自动保护,因此它不一定是“进程安全的”。
示例代码:
import multiprocessing
from multiprocessing import Process, Value, Array, Lockdef worker_value(share_value, lock):with lock:share_value.value += 1print("current process: {} share_value is: {}".format(multiprocessing.current_process().name, share_value.value))def work_array(share_array, lock):with lock:for i in range(len(share_array)):share_array[i] = share_value.value*iprint("current process: {} share_array is: {}".format(multiprocessing.current_process().name, share_array[i]))if __name__ == "__main__":share_value = Value('i', 1) # 整型数字1share_array = Array('h', 10) # 表示开辟3个空间,且均为整型,其实就是一个列表lock_1 = Lock() #创建共享锁lock_2 = Lock()proc_list = []for i in range(3):proc_1 = Process(target=worker_value, args=(share_value, lock_1))proc_list.append(proc_1)proc_2 = Process(target=work_array, args=(share_array, lock_2))proc_list.append(proc_2)for proc in proc_list:proc.start()for proc in proc_list:proc.join()print("data in share_value: {}".format(share_value.value)) ##share_value.value返回共享Value的值print("share_array object: {}".format(share_array)) ##此处share_array是一个Array封装对象array_list = []for item in share_array:array_list.append(item)print("data in share_array: {}".format(array_list))
运行结果:
current process: Process-1 share_value is: 2
current process: Process-2 share_value is: 3
current process: Process-3 share_value is: 4
current process: Process-4 share_array is: 0
current process: Process-4 share_array is: 4
current process: Process-4 share_array is: 8
current process: Process-4 share_array is: 12
current process: Process-4 share_array is: 16
current process: Process-4 share_array is: 20
current process: Process-4 share_array is: 24
current process: Process-4 share_array is: 28
current process: Process-4 share_array is: 32
current process: Process-4 share_array is: 36
data in share_value: 4
share_array object: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_short_Array_10 object at 0x10479d1e0>>
data in share_array: [0, 4, 8, 12, 16, 20, 24, 28, 32, 36]
从以上输出可以看出:
share_value.value()可以直接获取共享Value的返回值
share_array返回的是一个array对象,不能直接获取返回值。
用法二:进程共享
通过Manager()返回的一个manager对象控制一个服务器进程,它保持住Python对象并允许其它进程使用代理操作它们。同时它用起来很方便,而且支持本地和远程内存共享。
Manager模块管理的共享数据类型有:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array,同时还可以共享类的实例对象。
(1)共享简单的dict类型数据
示例代码:
import multiprocessing
from multiprocessing import Manager, Processdef worker(dic_data, key, value):dic_data[key] = str(key) + "_" + str(value)print('data in {} is: {}'.format(multiprocessing.current_process().name, dic_data))if __name__ == "__main__":mgr = Manager()data = {"a": "hello", "b": "welcome", "c": "python"}dict_data = mgr.dict()task_list = []for k, v in data.items():task = Process(target=worker, args=(dict_data, k, v))task_list.append(task)for task in task_list:task.start()for task in task_list:task.join()print("result is: {}".format(dict_data))
运行结果:
data in Process-2 is: {'a': 'a_hello'}
data in Process-3 is: {'a': 'a_hello', 'b': 'b_welcome'}
data in Process-4 is: {'a': 'a_hello', 'b': 'b_welcome', 'c': 'c_python'}
result is: {'a': 'a_hello', 'b': 'b_welcome', 'c': 'c_python'}
以上输出可以看出字典对象dic_data在子进程中共享了同一份数据。
(2) 共享嵌套dict数据
注意:进程间共享嵌套dict数据时,必须每一层dict都需要实例化为Manager().dict()对象。
如下代码,只实例化外层dict为Manager().dict()对象
import multiprocessing
from multiprocessing import Manager, Processdef worker(dic_data, key, value):dic_data["china"][key] = str(key) + "_" + str(value)print('data in {} is: {}'.format(multiprocessing.current_process().name, dic_data))if __name__ == "__main__":data = {"a": "hello", "b": "welcome", "c": "python"}mgr = Manager()dict_data = mgr.dict() ##仅仅设置外层为Manager().dict()对象dict_data["china"] = datatask_list = []for k, v in data.items():task = Process(target=worker, args=(dict_data, k, v))task_list.append(task)for task in task_list:task.start()for task in task_list:task.join()print("result is: {}".format(dict_data))
运行结果:
data in Process-2 is: {'china': {'a': 'hello', 'b': 'welcome', 'c': 'python'}}
data in Process-3 is: {'china': {'a': 'hello', 'b': 'welcome', 'c': 'python'}}
data in Process-4 is: {'china': {'a': 'hello', 'b': 'welcome', 'c': 'python'}}
result is: {'china': {'a': 'hello', 'b': 'welcome', 'c': 'python'}}
仅仅实例化外层dict为Manager().dict()对象,没有得到我们预期的输出结果。
改进代码:
每层dict都实例化为Manager().dict()对象
import multiprocessing
from multiprocessing import Manager, Processdef worker(dic_data, key, value):dic_data["china"][key] = str(key) + "_" + str(value)print('data in {} is: {}'.format(multiprocessing.current_process().name, dic_data))if __name__ == "__main__":data = {"a": "hello", "b": "welcome", "c": "python"}mgr = Manager()dict_data = mgr.dict()dict_data_inner = mgr.dict()dict_data["china"] = dict_data_innertask_list = []for k, v in data.items():task = Process(target=worker, args=(dict_data, k, v))task_list.append(task)for task in task_list:task.start()for task in task_list:task.join()print(dict_data)print("result is: {}".format(dict_data["china"]))
运行结果:
data in Process-2 is: {'china': <DictProxy object, typeid 'dict' at 0x105469cf8>}
data in Process-3 is: {'china': <DictProxy object, typeid 'dict' at 0x105469cf8>}
data in Process-4 is: {'china': <DictProxy object, typeid 'dict' at 0x105469cf8>}
{'china': <DictProxy object, typeid 'dict' at 0x105469cf8>}
result is: {'a': 'a_hello', 'b': 'b_welcome', 'c': 'c_python'}
(3)共享Value、Array、list、Lock等数据
示例代码:
from multiprocessing import Manager, Processdef worker(share_value, share_list, share_dict, lock):with lock:share_value.value += 1share_dict["a"] = "hello"for i in range(len(share_list)):share_list[i] *= 2if __name__ == "__main__":mgr = Manager()share_value = mgr.Value('i', 1) #i为typecodeshare_list = mgr.list([1, 2, 3, 4])share_dict = mgr.dict()share_array = mgr.Array('i', range(10))lock=mgr.Lock()proc_list = []for i in range(3):proc = Process(target=worker, args=(share_value, share_list, share_dict, lock))proc_list.append(proc)for proc in proc_list:proc.start()for proc in proc_list:proc.join()print(share_value)print(share_list)print(share_dict)print(share_array)
运行结果:
Value('i', 4)
[8, 16, 24, 32]
{'a': 'hello'}
array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
注意:
typecode必须为指定的字符,否则会报错ValueError: bad typecode (must be b, B, u, h, H, i, I, l, L, q, Q, f or d)
参考文献:
https://www.cnblogs.com/gengyi/p/8620853.html
http://blog.51cto.com/11026142/1874807
Python并发之多进程multiprocessing(2)相关推荐
- Python 学习笔记 多进程 multiprocessing
Python 解释器有一个全局解释器锁(PIL),导致每个 Python 进程中最多同时运行一个线程,因此 Python 多线程程序并不能改善程序性能,不能发挥多核系统的优势,可以通过这篇文章了解. ...
- python 多进程multiprocessing 如何获取子进程的返回值?进程池pool,apply_async(),get(),
案例1 # -*- coding: utf-8 -*- """ @File : 20200318_摄像头多进程流传输.py @Time : 2020/3/18 14:58 ...
- Python多线程threading和多进程multiprocessing的区别及代码实现
1. 多线程threading import time from threading import Threaddef thread_read(data):while True:print('read ...
- 二十四、深入Python多进程multiprocessing模块
@Author:Runsen multiprocessing multiprocessing包是Python中的多进程管理包.与之前的threading.Thread类似,它可以利用multiproc ...
- Python 多进程 multiprocessing 使用示例
参考:http://blog.csdn.net/qdx411324962/article/details/46810421 参考:http://www.lxway.com/4488626156.htm ...
- Python学习_进程multiprocessing 多进程 协程
进程的简单用法: #!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process import tim ...
- 【python】详解multiprocessing多进程-Pool进程池模块(二)
[python]详解multiprocessing多进程-process模块(一) [python]详解multiprocessing多进程-Pool进程池模块(二) [python]详解multip ...
- Python 多进程 multiprocessing.Pool类详解
multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对 ...
- python并发编程:协程asyncio、多线程threading、多进程multiprocessing
python并发编程:协程.多线程.多进程 CPU密集型计算与IO密集型计算 多线程.多进程与协程的对比 多线程 创建多线程的方法 多线程实现的生产者-消费者爬虫 Lock解决线程安全问题 使用线程池 ...
最新文章
- 大天使之剑服务器维护,大天使之剑————【维护】10月20日更新维护公告
- 如何快速采集分析平台日志,并进行展示监控?
- wait和notify
- centos7 配置国内yum源和epel源
- python数字加密解密_Python对整形数字进行加密和解密
- Modify批量处理优化
- python 编辑excel需要什么包_Python 中操作EXCEL表格的包
- 【操作系统笔记】中断和异常
- python将jwths256加密——pyjwt库
- 2月21 深度优先与广度优先
- Atitit android app 最佳实践2021目录1. Android strudio,,and viruse machine need down another... 11.1. P
- 【UVA1339】古老的密码(巧妙思路+(q)sort降序排列的三种方法)
- 金蝶KIS专业版13.0视频教程
- Python 函数的嵌套
- 比较神秘的网址!也许有用!收藏一下!
- 了解Apache实木复合地板
- 股票中主力净量什么意义
- Vitamio 依赖导入 步骤
- 一些实用的安卓UI设计工具
- 微服务架构系列主题:探索原味BFF模式