目录

  • 前言:
  • 多进程
    • join方法
    • 并发实现
    • 常用参数
    • 子进程名称空间
    • 僵尸进程与孤儿进程
    • 守护进程
    • 互斥锁(进程同步)
    • 队列
      • 生产者消费者模型
    • 进程池

前言:

在学习并发编程前,可以了解进程与线程相同内容:进程与线程的关系,任务执行方式

并发编程会运用到创建、管理进程与线程相关的内容。

创建多个进程或线程,可以有效提高我们程序的运行效率,但其中也存在一些问题,那么现在来了解一下吧!

多进程

这里通过Python提供给我们的模块multiprocessing来向操作系统发送信号帮助我们创建一个子进程,为什么是子进程?因为它是基于我们已存在的进程所创建出来的,所以称为:子进程

我们使用到的是multiprocessing模块内的一个Process类创建进程,后续再了解该模块其它使用方式。

from multiprocessing import Processfrom multiprocessing import Process
import time
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n,os.getppid(),os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':p = Process(target=task,args=(1,)) # 创建一个进程对象,target进程执行的目标,args表示向目标进行位置传参,也可以使用kwargs进行关键字传参p.start() # 发起信号,通知操作系统开启进程

进程创建必须写在if __name__ == '__main__下面,因为window | Mac | Linux 默认进程启动方式为spawn,使用spawn会执行传递给target的函数所在文件

图片说明:


从图中可以得知,我们第一次主动运行这个文件,打印的是:__mian__,而第二次就是由进程执行的,所以会打印:__mp_main__

可以尝试不加if __name__ == '__main__'来创建进程,Python会产生提示性报错。

官方文档:


其中Linux | Mac 都属于Unix平台,所以可以设置fork,设置了fork以后就可以不需要使用if __name__ == '__main__'

from multiprocessing import set_start_method
set_start_method('fork')

windows平台不能够设置此启动方法。

不必要纠结这些,一般都会将程序放入if __name__ == '__main__'执行即可

进程开启延迟问题

我们在通知操作系统开启一个子进程也会需要一定时间,那么这期间程序不会停留等待子进程的开启,而是会继续向下执行代码

from multiprocessing import Process
import time
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n,os.getppid(),os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())start = time.time()p = Process(target=task,args=(1,))p.start() # 这里执行通知操作系统开启一个进程,但并没有立马开启。print('主进程结束了:%s' % os.getpid())print('程序总共运行时间:%0.2fs' % (time.time() - start))

执行结果

'''
主进程开始了:1503
主进程结束了:1503
程序总共运行时间:0.01s
Process-1 主进程:1503,子进程:1505 正在运行
Process-1 主进程:1503,子进程:1505 运行完毕
'''

打印结果发现,程序运行时间居然只有0.1秒?这里并没有把开启进程后运行的时间算上,因为start()只是发起一个开启进程的信号而已,很快会继续向下执行代码,所以我们要解决这个问题。

join方法

该方法的作用就是让父进程进入阻塞状态,等待子进程结束,待子进程结束后将其回收掉

使用该方法后就不会出现父进程比子进程提前结束的情况,而是会停留在join()方法处,等待子进程运行完毕代码才会继续向下运行。

from multiprocessing import Process
import time
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n, os.getppid(), os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())start = time.time()p = Process(target=task, args=(1,))p.start()  # 这里执行通知操作系统开启一个进程,但并没有立马开启。p.join() # 让父进程等待p这个子进程结束print('主进程结束了:%s' % os.getpid())print('程序总共运行时间:%0.2fs' % (time.time() - start))

执行结果

'''
主进程开始了:1548
Process-1 主进程:1548,子进程:1550 正在运行
Process-1 主进程:1548,子进程:1550 运行完毕
主进程结束了:1548
程序总共运行时间:1.10s
'''

可以看到程序已经达到了我们预期的效果,而程序运行多出的时间,创建进程对象、以及通知操作系统开启进程都会需要时间,多出来的几乎可以忽略不计。

并发实现

通过创建、启动多个子进程,让我们的程序达到并发的效果,可以根据代码猜测运行结果

from multiprocessing import Process
import time
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n, os.getppid(), os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())start = time.time()p1 = Process(target=task, args=(1,))p2 = Process(target=task, args=(2,))p3 = Process(target=task, args=(3,))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join()print('主进程结束了:%s' % os.getpid())print('程序总共运行时间:%0.2fs' % (time.time() - start))

执行结果

'''
主进程开始了:1592
Process-1 主进程:1592,子进程:1594 正在运行
Process-2 主进程:1592,子进程:1595 正在运行
Process-3 主进程:1592,子进程:1596 正在运行
Process-1 主进程:1592,子进程:1594 运行完毕
Process-2 主进程:1592,子进程:1595 运行完毕
Process-3 主进程:1592,子进程:1596 运行完毕
主进程结束了:1592
程序总共运行时间:3.10s
'''

因为在程序join()的时候,只是父进程在等待,而子进程还会一如既往的运行,且与join()的顺序无关

分析:

p1.join() # 该子进程运行时间 占1秒
p2.join() # 该子进程运行时间 占2秒,但是由于上面父进程等待已经过去了1秒,所以运行到这里,只需要运行1秒该子进程就结束了
p3.join() # 父进程等待上面两个子进程的同时,该进程也在运行,所以已经过去了两秒,运行到这里,只需要再运行1秒该子进程就结束了

上面是第一种进程创建的方式,我们也可以通过自己定义的类来创建

from multiprocessing import Process
import time
import osclass MyProcess(Process):  # 继承Process类def __init__(self, n):super().__init__()self.n = ndef run(self):  # 不同调用,开启进程后默认执行print('Process-%s 主进程:%s,子进程:%s 正在运行' % (self.n, os.getppid(), os.getpid()))time.sleep(self.n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (self.n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())start = time.time()process_list = []for i in range(1, 3):p = MyProcess(i)  # 通过循环创建、开启多个子进程对象p.start()process_list.append(p)  # 将开启的子进程放入一个列表中for i in process_list:  # 让主进程逐个等待子进程的结束i.join()print('主进程结束了:%s' % os.getpid())print('程序总共运行时间:%0.2fs' % (time.time() - start))

执行结果

'''
主进程开始了:1927
Process-1 主进程:1927,子进程:1929 正在运行
Process-2 主进程:1927,子进程:1930 正在运行
Process-1 主进程:1927,子进程:1929 运行完毕
Process-2 主进程:1927,子进程:1930 运行完毕
主进程结束了:1927
程序总共运行时间:2.09s
'''

join还存在一个timeout参数:

join(timeout=n)如果填写数值后就表示,如果在指定时间内该进程未结束则会被杀死

常用参数

terminate() 强制关闭子进程
is_alive() 判断子进程是否存活

import time
from multiprocessing import Process
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n,os.getppid(),os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())p = Process(target=task,args=(1,)) # 创建一个进程对象,target进程执行的目标,args向目标传递的值p.start() # 发起信号,通知操作系统开启进程p.terminate() # 强制关闭p进程time.sleep(0.1) # 等待0.1秒# 判断进程是否存活,可能刚刚强制关闭p进程后,马上来执行看到的会是True,但是等待0.1秒后操作系统就可以反应过来,然后打印Falseprint(p.is_alive()) print('主进程结束了:%s' % os.getpid())

执行结果

'''
主进程开始了:3549
False
主进程结束了:3549
'''

子进程名称空间

创建子进程会将父进程内的名称空间拷贝一份,然后在自己里面就可以访问到与父进程相同的变量、函数等等

from multiprocessing import Processcount = 100
def task():global countprint(f'子进程获取count:{count}')count = 0print('子进程已经修改count为0')if __name__ == '__main__':p = Process(target=task)p.start()p.join()print(count)

执行结果

'''
子进程获取count:100
子进程已经修改count为0
100
'''

子进程将父进程名称空间拷贝后,自己也就有了count变量,所以它改变的count是自己的,而不会影响父进程

僵尸进程与孤儿进程

僵尸进程:子进程结束了,但父进程未及时回收子进程所占资源,那么子进程就会变成僵尸进程。僵尸进程会占用PID(进程编号),当PID被占满以后,将开启不了任何程序,所以僵尸进程是有害的。

孤儿进程:子进程未结束,父进程提前结束,则进程会变成孤儿进程,孤儿进程会被PID为1的进程所收养并管理,此时PID为1的进程就成了它的父进程,待该进程任务运行完毕后,PID为1的进程会回收它所占用的资源,所以孤儿进程是无害的。

windows不会产生僵尸进程,Linux | Mac会有,因为这是一种好心机制,让父进程清楚的知道自己有哪些子进程,所以子进程结束以后会保留部分信息,如PID,留给父进程来回收。

windows系统在子进程结束后会立即自动清除子进程的Process对象
linux | mac系统子进程的Process对象,如果没有start函数的话会在主进程结束后统一清除。在执行到start()时,会检测当前状态是否存在僵尸进程,如果存在则清除

代码演示僵尸进程的产生:

from multiprocessing import Process
import time
import osdef task(n):print('Process-%s 主进程:%s,子进程:%s 正在运行' % (n, os.getppid(), os.getpid()))time.sleep(n)print('Process-%s 主进程:%s,子进程:%s 运行完毕' % (n, os.getppid(), os.getpid()))if __name__ == '__main__':print('主进程开始了:%s' % os.getpid())start = time.time()p = Process(target=task, args=(1,))p.start()  # 这里执行通知操作系统开启一个进程,但并没有立马开启。# 这里并没有join方法清理子进程time.sleep(2000) # 将父进程进行睡眠print('主进程结束了:%s' % os.getpid())print('程序总共运行时间:%0.2fs' % (time.time() - start))

执行结果

回收僵尸进程的几种方式:

1、父进程结束掉了,僵尸进程自然也随之被回收
2、通过 wait 调用来读取子进程退出状态。比如通过 multiprocessing.Process 产出的进程可以通过子进程的 join()方法来 wait,也可以在父进程中处理 SIGCHLD 信号,在处理程序中调用 wait 系统调用或者直接设置为 SIG_IGN 来清除僵尸进程。
但Python有回收机制,只要父进程没有整体被阻塞在原地time.sleep(),那么Python解释器会定期在后台帮助我们清理僵尸进程
3、SIGCHLD信号、前提是父进程写了相关处理方案

start()方法自带清理僵尸进程

from multiprocessing import Process
import osdef task():print('子进程:',os.getpid())if __name__ == '__main__':while True:p1 = Process(target=task)p2 = Process(target=task)p1.start()p2.start()

虽然上序代码我们在不断产生进行,并没有执行清理进程的操作,但是每次都会执行start(),该方法内会自动检测当前状态是否存在僵尸进程,并清理


该方法会检测已经完成的进程,已经完成的进行就是僵尸进程,它会帮助我们完成回收操作。当然,更倾向于使用join()等待子进程结束后回收,因为start()是在创建一个新的进程时会检测,如果我们不需要再创建新的进程了,那么就需要使用join()方法

守护进程

当指定某个进程作为守护进程以后,当主进程代码完毕以后,守护进程会立马结束。

from multiprocessing import Process
import time,osdef task():print('子进程开始:',os.getpid())time.sleep(10)print('子进程结束:',os.getpid())if __name__ == '__main__':print('主进程开始了:',os.getpid())p = Process(target=task)p.daemon = True # 指定p这个进程对象作为守护进程p.start()time.sleep(3)print('主进程结束了:',os.getpid())

执行结果

'''
主进程开始了: 2009
子进程开始: 2011
主进程结束了: 2009
'''

可以发现,子进程执行时间需要10s左右,而父进程只需要3s左右,整体代码就运行完毕了,所以当父进程代码运行完毕以后,守护进程也随之关闭。

再进行一个小实例,理解守护进程,我们可以猜测一下执行结果,注意:守护的是父进程的代码。

from multiprocessing import Process
import time,osdef foo():print('foo函数 子进程开始:',os.getpid())time.sleep(1)print('foo函数 子进程结束:',os.getpid())def task():print('task函数 子进程开始:', os.getpid())time.sleep(1)print('task子进程结束:', os.getpid())if __name__ == '__main__':p = Process(target=foo)p2 = Process(target=task)p.daemon = Truep.start()p2.start() # 只是通知操作系统开启进程,不需要多少时间print('________main________')# 当这行代码执行完毕,说明父进程代码全部执行完了,守护进程就会结束

执行结果

'''
________main________
task函数 子进程开始: 2051
task子进程结束: 2051
'''

foo函数未执行,是因为父进程的代码很快就全部执行完了,而它对应的进程还未开启就已经结束了。

注意:守护进程最好不要加上join,因为这样就会让它失去守护进程的意义。

互斥锁(进程同步)

经过上锁以后,一次只有一个进程能够占用资源,当锁被释放后,下一个进程才能够使用该资源。

变成了串行来执行,但相对于并发(多个进程可以同时访问相同的资源)串行更加保证了数据的安全性。

from multiprocessing import Process,Lock,set_start_method
import os,timedef task():print(f'子进程:{os.getpid()}开启了')time.sleep(1)print(f'子进程:{os.getpid()}结束了\n')def run(lock=None):lock.acquire() # 将进程上锁,task()lock.release()if __name__ == '__main__':set_start_method('fork') # 因为笔者是mac系统,必须要修改启动方式lock = Lock()for i in range(4):p = Process(target=run)p.start()

执行结果

'''
子进程:2226开启了
子进程:2226结束了子进程:2227开启了
子进程:2227结束了子进程:2228开启了
子进程:2228结束了
'''

这不就是我们在start()下面使用join()所达到的效果吗?

通过一个模拟抢票的程序来实例:

需求就是,可以多个用户同时查询票数(并发),但是只有一个用户能购买到(串行)

未加锁前

from multiprocessing import Process,Lock,set_start_method
import time
import json
import randomdef check(people):with open('ticket.json','rt' , encoding='utf-8') as f:ticket = json.load(f)print(f'用户{people} 检查到了剩余票数:{ticket["ticket"]}')def get(people):time.sleep(random.randint(1,3)) # 模拟购票时的网络延迟with open('ticket.json','rt', encoding='utf-8') as f:ticket = json.load(f)if ticket["ticket"] > 0:ticket["ticket"] -= 1with open('ticket.json','wt',encoding='utf-8') as f:json.dump(ticket,f) # 将修改后的字典序列化入文件print(f'用户{people}抢到了票~')else:print(f'用户{people}未抢到票!')def run(people):check(people)get(people)if __name__ == '__main__':for i in range(1,4):p = Process(target=run,args=(i,))p.start()

执行结果

'''
用户1 检查到了剩余票数:1
用户2 检查到了剩余票数:1
用户3 检查到了剩余票数:1
用户2抢到了票~
用户1抢到了票~
用户3未抢到票!
'''

这种写法可能会导致我们获取到的数据错乱,因为是多个用户同时修改票的数据,虽然有了效率问题,但是这个场景是首先考虑的是数据的安全性。

加锁后

from multiprocessing import Process,Lock,set_start_method
import time
import json
import randomdef check(people):with open('ticket.json','rt' , encoding='utf-8') as f:ticket = json.load(f)print(f'用户{people} 检查到了剩余票数:{ticket["ticket"]}')def get(people):time.sleep(random.randint(1,3)) # 模拟购票时的网络延迟with open('ticket.json','rt', encoding='utf-8') as f:ticket = json.load(f)if ticket["ticket"] > 0:ticket["ticket"] -= 1with open('ticket.json','wt',encoding='utf-8') as f:json.dump(ticket,f) # 将修改后的字典序列化入文件print(f'用户{people}抢到了票~')else:print(f'用户{people}未抢到票!')def run(people,lock):check(people)lock.acquire()get(people)lock.release()if __name__ == '__main__':set_start_method('fork') # 由于笔者是mac系统,所以需要修改,windows不需要lock = Lock()for i in range(1,4):p = Process(target=run,args=(i,lock))p.start()

执行结果

'''
用户1 检查到了剩余票数:1
用户2 检查到了剩余票数:1
用户3 检查到了剩余票数:1
用户1抢到了票~
用户2未抢到票!
用户3未抢到票!
'''

进程之间加上了锁之后,虽然降低了效率,但是提高了我们数据的安全性。根据不同的场景,编写合适的程序

队列

进程彼此之间相互隔离,如果要实现进程间的通信(IPC),multiprocessing模块提供了几种特别好用的方式,队列、管道。这两种方式都是实现消息传递的。这里主要学习队列的使用

from multiprocessing import QueueQueue(最大可存入队列的数量) # 可以指定最大数量、不写的话默认无上限

主要方法,以及参数:

from multiprocessing import Queueq = Queue(3) # 指定队列可以数据的数量,队列原则:先进先出# 先put进入的,会被先get出来q.put(1) # 向队列插入值
q.put(2)
q.put(3)q.put(4) # 当队列满了以后,那么再向往里放,就会进入阻塞状态,也就是在等待队列get值出去
q.put(4,block=False) # 当队列满了以后直接抛出异常,不写的话,block默认为Trueq.put(4,timeout=3) # 定义超时时间,如果指定时间内队列还是满的无法插入值,则报错print(q.get()) # 获取队列第一个元素
print(q.get()) # 获取队列第二个元素
print(q.get()) # 获取队列第三个元素q.get() # 当队列没有值可以获取时,那么就阻塞在原地,和上面原理一样,get阻塞则是等待队列put值进去。
q.get(block=False) # False则表示如果队列没有值可取则抛出异常,默认为block=True(阻塞等待)
q.get(timeout=3) # 如果3s内,队列没有值可以get出来,则报错

上序介绍了Queue队列的使用方式,但是只是针对于单进程,那么我们需要做的是让多个进程之间可以通过队列来通信,拿一个比较常用的模型来模拟

生产者消费者模型

该模型中的两种角色:生产者与消费者

生产者:负责产生数据并存入队列中

消费者:从队列中获取数据

from multiprocessing import Process,Queue,set_start_method
import time
import random# 生产者
def producer(q,name,food):for i in range(1,4): # 每种食物生产3个q.put(food + str(i)) # 将每次生产的事务放入队列内print(f'{name}生产了 {food}{i}')time.sleep(random.randint(1,3)) # 随机间隔1-3s# 消费者
def consumer(q,name):while True: # 不断的吃!res = q.get() # 获取队列内的食物if res is None: # 我们在食物生产完成后做的标识,如果检测到为None,说明没有食物了breaktime.sleep(random.randint(1,3))print(f'{res} 被{name}吃掉了')if __name__ == '__main__':set_start_method('fork')q = Queue()# 定义生产者,生产好吃的(数据)p1 = Process(target=producer,args=(q,'jack','苹果'))p2 = Process(target=producer,args=(q,'tom','香蕉'))p3 = Process(target=producer,args=(q,'jams','三明治'))# 定义消费者,也就是吃货c1 = Process(target=consumer,args=(q,'小明'))c2 = Process(target=consumer,args=(q,'小李'))# 开始生产食物p1.start()p2.start()p3.start()# 开始吃食物c1.start()c2.start()p1.join()p2.join()p3.join() # 运动行到这里,说明食物已经生产完成q.put(None)q.put(None)# 存放两个None,因为有两个吃货(消费者),每个get到None之后就会停止获取食物

执行结果

'''
jack生产了 苹果1
tom生产了 香蕉1
jams生产了 三明治1
jams生产了 三明治2
苹果1 被小明吃掉了
tom生产了 香蕉2jack生产了 苹果2三明治1 被小明吃掉了
香蕉1 被小李吃掉了
jams生产了 三明治3
jack生产了 苹果3
tom生产了 香蕉3
三明治2 被小明吃掉了香蕉2 被小李吃掉了三明治3 被小明吃掉了
苹果2 被小李吃掉了
香蕉3 被小李吃掉了
苹果3 被小明吃掉了'''

其中打印到一起的原因是因为进程同时执行的问题,但这并没有任何影响,至少实现了我们想要的效果,但是也有一个弊端,那就是每次在结尾都需要None来作为标识,意味着有多少个消费者,就None多少次,我们需要进一步优化,使用到一个新的标记方法,通过信号次数检测

from multiprocessing import Process,JoinableQueue,set_start_method
import time
import random# 生产者
def producer(q,name,food):for i in range(1,4): # 每种食物生产3个q.put(food + str(i)) # 将每次生产的事务放入队列内print(f'{name}生产了 {food}{i}')time.sleep(random.randint(1,3)) # 随机间隔1-3sq.join()# 让主进程阻塞,除非put入队的次数与task_done传入信号次数相同,才会取消阻塞,也就是等待吃货把食物全部吃完# 消费者
def consumer(q,name):while True: # 不断的吃!res = q.get() # 获取队列内的食物,如果没有食物就阻塞在原地,等待食物if res is None: # 我们在食物生产完成后做的标识,如果检测到为空,说明没有食物了breaktime.sleep(random.randint(1,3))print(f'{res} 被{name}吃掉了')q.task_done() # 向队列传入信号,表示已经取走一个数据if __name__ == '__main__':set_start_method('fork') # 这里由于笔者是Mac系统所以需要加上,windows不需要# 与Queue类似的模块,但是它提供了两个可以解决我们问题的方法q = JoinableQueue()# 定义生产者,生产好吃的(数据)p1 = Process(target=producer,args=(q,'jack','苹果'))p2 = Process(target=producer,args=(q,'tom','香蕉'))p3 = Process(target=producer,args=(q,'jams','三明治'))# 定义消费者,也就是吃货c1 = Process(target=consumer,args=(q,'小明'))c2 = Process(target=consumer,args=(q,'小李'))c1.daemon = Truec2.daemon = True # 将它们变成守护进程,当父进程代码结束以后,这两个进程也随之结束# 开始生产食物p1.start()p2.start()p3.start()# 开始吃食物c1.start()c2.start()p1.join()p2.join()p3.join() # 说明食物已经生产完成,并且会等待吃货是否把食物吃完,因为函数里存在join,如果吃完这行代码就算结束了# 到这里吃货相关的进程也会结束掉,虽然里面有while,但是它们是守护进程,父进程代码完毕,它们也的结束

借助生产者与消费者模型,可以很好演示出队列的作用

进程池

进程池的作用就是限制了我们可以使用进程的数量,创建好指定进程数量,此后做的操作都是使用已经创建的进程,解决的问题就是:避免了重复创建、摧毁进程的操作,如果进程池里的进程使用数量上限,那么此时如果还有任务需要进程执行,就会等待其中进程执行完毕,再使用里面空闲的进程来执行该任务。

这里提供两种开启进程池的方法:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

我们先来介绍第一种:Pool

语法:

Pool([numprocess  [,initializer [, initargs]]]):创建进程池numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer:是每个工作进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组

主要方法介绍:

pool.apply(func,(args),{kwargs}) # 将进程池中执行func,以同步的方式,等待进程返回执行结果
pool.apply_async(func,(args),{kwargs}) # 将进程池中执行func,以异步的方式,不需要等待进程返回执行结果
pool.close() # 关闭进程池,防止进一步操作。
pool.join() # 等待进程池里面的工作进程退出(在close之后使用,或teminate()之后使用)

其它方法:

pool.apply() 或 pool.apply_asyn() 会返回一个AsyncResul的对象,可以使用以下方法:
obj.get() # 返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready() # 如果调用完成,返回True
obj.successful() # 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]) # 等待结果变为可用。
obj.terminate() # 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

apply同步实例:

from multiprocessing import Pool
import os,timedef task(i):print(f'{os.getpid()} 正在运行')time.sleep(2)print(f'{os.getpid()} 正在结束\n')return i ** 2if __name__ == '__main__':p = Pool(4) # 创建包含有4个进程的进程池for i in range(10):p.apply(task,(i,)) # 将函数提交给进程池内的空闲进程执行p.close() # 不允许操作进程池p.join() # 等待工作进程结束

执行结果

1915 正在运行
1915 正在结束1916 正在运行
1916 正在结束1917 正在运行
1917 正在结束1918 正在运行
1918 正在结束1915 正在运行
1915 正在结束1916 正在运行
1916 正在结束1917 正在运行
1917 正在结束1918 正在运行
1918 正在结束1915 正在运行
1915 正在结束1916 正在运行
1916 正在结束

同步调用需要等待一个进程运行完才开启下一个,这里注意到的是:存在相同的进程号,原因是因为如果有进程完成了任务,则进入空闲状态,等待下一次接收到任务。从头至尾使用的都是我们创建的4个进程

apply_async异步调用实例:

from multiprocessing import Pool
import os,timedef task(i):print(f'{os.getpid()}  正在运行{i}')time.sleep(2)print(f'{os.getpid()} 正在结束{i}\n')return i ** 2if __name__ == '__main__':p = Pool(4)res_lis= []for i in range(10):res = p.apply_async(task,(i,))res_lis.append(res) # 将执行后产生的对象放入列表里p.close()p.join()for i in res_lis:print(i.get()) # 获取进程执行函数拿到的结果

执行结果

1998  正在运行0
2000  正在运行1
1999  正在运行2
2001  正在运行3
1998 正在结束0
2000 正在结束12000  正在运行4
1998  正在运行5
1999 正在结束22001 正在结束31999  正在运行6
2001  正在运行7
1998 正在结束52000 正在结束42000  正在运行8
1998  正在运行9
1999 正在结束6
2001 正在结束71998 正在结束92000 正在结束80
1
4
9
16
25
36
49
64

异步:不需要在原地等待执行结果,直接把任务分配给不同的进程。

map方法:取代了循环往进程池内放入任务的步骤

import os,time
from multiprocessing import Pooldef task(i):print(f'{os.getpid()}  正在运行{i}')time.sleep(2)print(f'{os.getpid()} 正在结束{i}\n')return i ** 2if __name__ == '__main__':print(f'父进程:{os.getpid()}')p = Pool(4)p.map(task,range(1,10)) # (函数,传递的参数)p.close()p.join()

执行结果

'''
父进程:2483
2485  正在运行1
2488  正在运行2
2486  正在运行3
2487  正在运行4
2485 正在结束12485  正在运行5
2488 正在结束2
2486 正在结束32487 正在结束42486  正在运行6
2488  正在运行7
2487  正在运行8
2485 正在结束52485  正在运行9
2488 正在结束7
2486 正在结束62487 正在结束82485 正在结束9
'''

第二种创建进程池方法:

默认:异步调用

import os,time
from concurrent.futures import ProcessPoolExecutordef task(i):print(f'{os.getpid()}  正在运行{i}')time.sleep(2)print(f'{os.getpid()} 正在结束{i}\n')return i ** 2if __name__ == '__main__':p = ProcessPoolExecutor(4) # 如果不指定进程数量,则默认为os.cpu_count()res_lis = []for i in range(10):res = p.submit(task,i) # 将函数提交给进程池内的空闲进程执行res_lis.append(res)p.shutdown(wait=True) # 不允许操作进程池,等待进程池结束for i in res_lis:print(i.result()) # # 获取进程执行函数拿到的结果

执行结果

2215  正在运行0
2214  正在运行1
2217  正在运行2
2216  正在运行3
2215 正在结束0
2214 正在结束12214  正在运行4
2215  正在运行5
2217 正在结束22216 正在结束32217  正在运行6
2216  正在运行7
2214 正在结束42214  正在运行8
2215 正在结束52215  正在运行9
2216 正在结束72217 正在结束62214 正在结束82215 正在结束90
1
4
9
16
25
36
49
64
81

以上这种方式,只能在进程全部运行完毕以后,再能进行处理,但是如果我们想要将每次进程执行完的结果进行处理,就需要使用回调函数

回调函数:将执行完操作的结果,传递给某个函数

import os,time
from concurrent.futures import ProcessPoolExecutordef task(i):print(f'{os.getpid()}  正在运行{i}')time.sleep(2)print(f'{os.getpid()} 正在结束{i}\n')return i ** 2def handle(future): # 拿到工作进程对象future = future.result() # 获取其执行完任务的返回值print(f'{os.getpid()} 正在处理结果:{future}')time.sleep(2)if __name__ == '__main__':print(f'父进程:{os.getpid()}')p = ProcessPoolExecutor(4)# 如果不指定进程池内的进程数量,则默认为os.cpu_count()res_lis = []for i in range(10):p.submit(task,i).add_done_callback(handle)# 将工作进程对象传递给handle函数p.shutdown(wait=True)# 不允许操作进程池,等待进程池结束,相当于Pool的close()+join()

处理结果的函数是由父进程来执行

父进程:2281
2283  正在运行0
2284  正在运行1
2285  正在运行2
2286  正在运行3
2283 正在结束02283  正在运行4
2281 正在处理结果:0
2284 正在结束12286 正在结束3
2285 正在结束22283 正在结束42284  正在运行5
2285  正在运行6
2286  正在运行7
2283  正在运行8
2281 正在处理结果:1
2284 正在结束52285 正在结束62283 正在结束8
2286 正在结束72285  正在运行9
2281 正在处理结果:4
2285 正在结束92281 正在处理结果:9
2281 正在处理结果:16
2281 正在处理结果:36
2281 正在处理结果:25
2281 正在处理结果:49
2281 正在处理结果:64
2281 正在处理结果:81

map方法:取代了我们for循环submit的用法

使用进程池可以帮助我们创建指定的进程数量,根据自身电脑的配置,一旦创建好以后,我们所有的任务都会由进程池内的几个进程来完成。每个进程都会等待任务的到来而运行。相比之前的创建进程、然后销毁进程,进程池更加友好


技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点赞 收藏+关注 子夜期待您的关注,谢谢支持!

Python 并发编程(进程)相关推荐

  1. Python 并发编程--进程,线程,协程

    并发编程 基本概念的区分: 并发 只有一个CPU,多个程序在一个CPU上轮流执行,宏观上多个进程并发执行,但微观上依旧是串行 并行 有多个CPU,多个程序在多个CPU上同时执行. 进程 计算机中最小的 ...

  2. python并发编程--进程、线程、协程、锁、池、队列

    文章目录 操作系统的概念 进程 multiprocessing模块 守护进程 使用多进程实现一个并发的socket的server 锁 生产者消费者模型 数据共享 线程threading模块 守护线程和 ...

  3. Python并发编程—进程

    多任务编程 1.意义: 充分利用计算机多核资源,提高程序的运行效率. 2.实现方案 :多进程 , 多线程 3.并行与并发 并发 : 同时处理多个任务,内核在任务间不断的切换达到好像多个任务被同时执行的 ...

  4. python并发编程-进程池线程池-协程-I/O模型-04

    目录 进程池线程池的使用***** 进程池/线程池的创建和提交回调 验证复用池子里的线程或进程 异步回调机制 通过闭包给回调函数添加额外参数(扩展) 协程*** 概念回顾(协程这里再理一下) 如何实现 ...

  5. python 并发编程 多线程 目录

    线程理论 python 并发编程 多线程 开启线程的两种方式 python 并发编程 多线程与多进程的区别 python 并发编程 多线程 Thread对象的其他属性或方法 python 并发编程 多 ...

  6. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  7. 学习笔记(33):Python网络编程并发编程-进程池线程池

    立即学习:https://edu.csdn.net/course/play/24458/296451?utm_source=blogtoedu 进程池与线程池: 一般应用在网站上,进程池或线程池最大的 ...

  8. python并发编程之semaphore(信号量)_浅谈Python并发编程之进程(守护进程、锁、信号量)...

    前言:本博文是对Python并发编程之进程的知识延伸,主要讲解:守护进程.锁.信号量. 友情链接: 一.守护进程(daemon) 1.1 守护进程概念 首先我们都知道:正常情况下,主进程默认等待子进程 ...

  9. Python并发编程之线程池/进程池

    引言 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我 ...

  10. Python并发编程之进程池

    Python并发编程之进程池 一.进程池简介 二.进程池代码实例 一.进程池简介 可以用Pool类来创建进程池,可以把各种数据处理任务都提交给进程池.进程池提供的功能有点类似于列表解析和功能性编程操作 ...

最新文章

  1. 记录一下HALCON基于可变形,利用CAD画dxf模板进行模板匹配(二)
  2. 【前端词典】如何向老板解释反向代理
  3. 图形的装饰教案计算机,《电脑图案设计师》教案教学设计
  4. 让每一首心动歌曲穿越人海遇见你,背后竟藏着这么多“黑科技”|回响·TME音乐公开课...
  5. 使用SSIS包导入SQL Server FILESTREAM数据
  6. python 请求头_Python爬虫:将headers请求头字符串转为字典
  7. wordpress iis php,Windows IIS 上安装部署 WordPress 网站快速简要教程
  8. 华佳慧科技:OSN500设备ERPS相切环组网介绍
  9. NLPIR汉语分词系统
  10. mumu 模拟器连不上adb
  11. 计算机软件系统验证报告,检验报告管理系统软件
  12. Python编程-pypyodbc无驱动和无法打开注册表等错误的解决办法
  13. Jvav常问面试题(附解析)
  14. 关于LVGL下物理按键的使用
  15. 记spring boot + shiro 认证,anon失效的问题的一种解决方式
  16. idea当中批量替换变量名字
  17. docker load镜像报错:open /var/lib/docker/tmp/docker-import-525555606/repositories: no such file or direc
  18. View 事件分发机制
  19. java榨汁机榨取不同水果,榨汁机别乱买,亲测榨水果翻车,九阳迷你原汁机评测...
  20. 区块链原理及核心技术

热门文章

  1. 金融大数据信用评分模型解析
  2. Solaris 迅速查找手册
  3. Pulling is not possible because you have unmerged files
  4. php四则运算出题器
  5. 10个开源电子商务平台
  6. 商业银行房贷业务节后骤然下降
  7. 【前端春招】前端春招实习+秋招心路历程
  8. Hp电脑测试软件还是硬件问题,惠普硬件怎么检测
  9. spring 配置context:component-scan base-package=” ”/
  10. javaSE探赜索隐之三--<类与对象的爱恨情仇下>