文章目录

  • 1.问题背景
  • 2.单线程→\rightarrow→多线程→\rightarrow→线程池
    • 2.1单线程简介
    • 2.2多线程简介
    • 2.3线程池介绍
      • 2.3.1复用线程
      • 2.3.2线程池的使用
  • 3.多线程→\rightarrow→多进程
    • 3.1进程池介绍
      • 3.1.1复用进程
      • 3.1.2进程池的使用
  • 4.多进程+多线程

1.问题背景

 最近在做的一个项目是用python同时给多个用户提供科学计算。为了最大化并行计算的能力,需要用到多进程;为了公平地服务多个用户,以及加速I/O bound任务(如文件读写、网络发送接收请求),需要用到多线程。
 由于新增进程和线程会耗费较多内存资源,肯定不能为每一个用户新增一个进程或线程。最佳实践是固定进程和线程数,并用一个池子来存储,所有用户都共用池子里的进程或线程。
 为了让自己从对该项目用到的技术有彻底了解,会先从单线程、多线程出发,再到线程池进程池

2.单线程→\rightarrow→多线程→\rightarrow→线程池

2.1单线程简介

 说起线程的作用,在看一大段官方定义前,不妨先考虑简单代码的运行,来看看运行代码必需的东西,再来说线程和这些必需东西的关系。比如下面函数调用的代码,我们想要按执行functionA()的一部分→\rightarrow→执行functionB()→\rightarrow→执行functionA()剩余部分的顺序执行,那么我们需要哪些东西?

def functionA():print('调用函数functionB()前')functionB()print('调用函数functionB()后')def functionB():print('调用函数functionB()中')functionA()

要实现函数调用,几乎所有编程语言都用调用栈来实现:每次调用函数,就把该函数放到栈顶,CPU再执行栈顶的函数;每次执行完函数,就将函数从栈顶删除,从而能继续执行未完成的函数。下段的注释体现了调用栈是怎么工作的。

def functionA():print('调用函数functionB()前') functionB() print('调用函数functionB()后')def functionB():print('调用函数functionB()中')functionA() # 执行顺序以及调用栈的变化:
# 运行functionA(),调用栈从空变为[functionA()],处理器执行栈顶的functionA()
# 运行 print('调用函数functionB()前') ,调用栈依然还是[functionA()],处理器继续执行栈顶的functionA()
# 运行 functionB() ,调用栈变为[functionB(),functionA()],处理器执行栈顶的functionB()
# 运行 print('调用函数functionB()中') ,调用栈变为[functionA()],处理器继续执行栈顶的functionA()
# 运行 print('调用函数functionB()后'),调用栈变为空

上面的示例说明代码的运行,需要调用栈(但绝不只需要调用栈)。一个线程,就拥有调用栈等运行代码所必需的东西。开启一个线程,就是创建调用栈等资源。如果能创建多个调用栈,CPU就可以在这多个栈上同时或交替运行函数了,这就是使用多线程。(同时运行出现在多个CPU分别在多个调用栈上同时执行代码;交替运行出现在一个CPU交替地在不同调用栈上执行代码)。

为了能体现出代码运行靠线程,而不是直接从源代码中自行地产生,不妨在代码的每一行都看一个名字(线程名),来看这个名字是不是都相同,如果是的话,说明有同一个东西一直贯穿代码运行的全过程。如下所示,不管是在代码的首尾,还是调用的函数内部,查看的名字都相同,这个都相同的名字就是线程名,对应的线程贯穿程序运行的始终。

import threadingprint('线程名为: '+threading.current_thread().getName())
# 预期结果:MainThreaddef function():print('线程名为: '+threading.current_thread().getName())# 预期结果:MainThreadfunction()print('线程名为: '+threading.current_thread().getName())
# 预期结果:MainThread

最后再说几句与主题无关的闲话,像C/C++和Java中的main()函数,就是用于帮助主线程识别第一个进入调用栈的函数是谁,所以这个main()函数不可或缺且不能改名。而python的话,主线程默认整个py文件就是一个函数,会直接从文件开头执行代码,不需要额外的main()函数。

2.2多线程简介

 上述单线程的简介漏了许多细节,但重点是说明代码的运行是靠拥有调用栈的线程。如果有多个调用栈,CPU就可以在这多个栈上同时或交替运行函数了。根据同时还是交替运行,可以总结多线程的优点:

场景 优点
多个线程同时运行 平衡任务执行的公平性减少任务的执行总用时;
多个线程交替运行 平衡任务执行的公平性;有I/O bound任务时可以减少总用时

下面对两个点进行简述:第一个点是公平性;第二个点是即便多线程是交替运行,也能减少I/O bound任务的总用时

  • 下面用服务多个用户的例子来表达公平性是什么:

假设有一个函数从1,2,3开始打印到无限大。有两个人都想得到打印结果,你怎么满足这两个人?
第一种尝试:先满足用户1,再满足用户2,如下所示。很显然这不现实,用户1永远满足不完,用户2永远看不到为他打印的东西。

import time
def print_infinity():i=1while True:print(i)i+=1time.sleep(3) # 每3秒打印一次
print_infinity() # 为用户1打印,永不停息
print_infinity() # 为用户2打印,不可能达到这行

第二种尝试:在命令行输入2次python <py_file.py>来启动两个进程,每个进程只为一个用户打印,如下所示。如果用户数再增多,这种新增进程的方式将消耗太多资源。

# 命名为test.py
import time
def print_infinity():i=1while True:print(i)i+=1time.sleep(3) # 每3秒打印一次
print_infinity() # 为1个用户打印
# 输入两次python <py_file.py>的来开启两个进程。
# windows系统可以开两个cmd,分别输入以下命令。linux系统则可以在一个命令行中输入两次python test.py &
python test.py

第三种最佳实践:只开启一个进程(只用1次python <py_file.py>),但为每个用户都开启一个线程调用打印函数,如下所示。让CPU在不同线程的调用栈上交替运行函数,从而交替服务用户。

import threading
import timedef print_infinity(user_name):i=1thread_name=threading.current_thread().getName()while True:print('%s \t 数字%d \t %s'%(user_name,i,thread_name))i+=1time.sleep(3)t1=threading.Thread(target=print_infinity,args=('用户1',),name='线程1')
t2=threading.Thread(target=print_infinity,args=('用户2',),name='线程2')
t1.start()
t2.start()
t1.join()
t2.join()

预期结果如下图所示,用户1和用户2都可以看到只属于自己的数字。相比于方案1单线程只能为1个用户打印的缺陷,多线程能让多个用户交替或同时得到服务,这就是公平性

  • 即便多个线程是交替运行,只要有I/O bound任务,多线程依然可以减少总用时。所谓的I/O bound任务,就是不怎么需要CPU参与的,只需要等时间流逝(time.sleep())、等数据从网上传过来(requests.get())、等磁盘读取或写入文件(如file.write())…。对以上2个用户交替打印数字的例子,CPU、线程状态随时间的关系如下表:
时刻 0.1秒 0.2秒 3.1秒 3.2秒
处理器 执行线程1 执行线程2 执行线程1 执行线程2
线程1 执行打印函数,发现要等3秒 不执行任何代码 执行打印函数,发现时间够了,打印数字 不执行任何代码
线程2 不执行任何代码 执行打印函数,发现要等3秒 不执行任何代码 执行打印函数,发现时间够了,打印数字

上表说明,即便处理器总是交替执行各个线程的代码,也可用大约3秒的时间,让2个用户都得到数字,并不是用3秒让1个客户得到数字,然后又用3秒让另一个客户得到数字。
但如果是CPU bound的任务,也就是任务的完成基本是靠CPU算出来,那么多线程交替运行并不会降低任务的总用时。继续基于以上2个用户交替打印数字的例子,做一点变化,假设处理器要算30次运算才打印1次数字(每次运算用时0.1秒,也即共需要3秒)。那么2个线程下,CPU、线程状态随时间的关系如下表:

时刻 0.1秒 0.2秒 3.1秒 3.2秒 6.1秒 6.2秒
处理器 执行线程1 执行线程2 执行线程1 执行线程2 执行线程1 执行线程2
线程1 执行打印函数,进行第1次运算 不执行任何代码 执行打印函数,进行第15次运算 不执行任何代码 执行打印函数,进行第30次运算,在6.2秒前打印数字 不执行任何代码
线程2 不执行任何代码 执行打印函数,进行第1次运算 不执行任何代码 执行打印函数,进行第15次运算 不执行任何代码 执行打印函数,进行第30次运算,在6.3秒前打印数字

上表说明,遇到CPU bound任务,如果CPU交替运行多线程的函数,在大约6秒的时候,才能为2个用户输出数字,这不同于之前3秒为2个用户输出数字的情况。总用时并未改善,反而因为公平性,让2个用户都只在6秒附近得到结果(此时公平性带来了不好的结果,先让1个客户算3秒,再让另一个客户算3秒,起码也能让1个人在第3秒得到结果,1个人在第6秒得到结果,不至于2个人都在第6秒才得到结果)。

与之相关的2个结论是:

  1. 多线程同时并行,能平衡任务的公平性,也能减少任务的执行总用时
  2. 多线程交替运行,能平衡任务间的公平性,不一定减少任务的执行总用时。如果有I/O bound任务,多线程能减少总用时;但如果都是CPU bound任务,多线程不仅不能减少总用时,反而会由于公平性,让每个用户都较晚才能拿到结果。

2.3线程池介绍

 如果根据任务类型(I/O bound还是CPU bound)及处理器数量(决定了多线程是同时还是交替运行),发现多线程能减少任务完成总用时、平衡任务间公平性,就可以尝试使用多线程。然而低资源占用地实现这点,就要考虑是不是对每一个任务,都创建一个新线程去执行。
 创建线程需要内存资源(栈内存)与时间,频繁地创建线程既会消耗太多内存,也会耗时太多(甚至创建线程的时间比完成任务的时间还要长)。为了既享受多线程的好处,又避免频繁创建线程带来的坏处,一个主流实践就是使用线程池。它限制线程的数量上限(减少资源消耗),在线程执行完一个函数后不会被销毁而是等待新函数进入调用栈,其实就是复用线程

2.3.1复用线程

 有些小伙伴可能说,每次在命令行使用python <py_file.py>拿到结果后,线程不是都会结束么,哪里还有复用线程的机会?一个线程的调用栈为空后,确实会导致线程被销毁。让线程一直存在的方法在于让线程的调用栈永不为空,也就是让线程运行的函数具有while True的循环语句,这样的话该函数永远不会被执行完,调用栈也永不为空。
 又有小伙伴可能说,如果让线程一直执行有while True的函数,那线程还怎么运行其他函数呢?最佳实践就是在while True的代码块里,使用队列queue获取外界输入的函数和参数,从而能在while True的循环体里执行其他函数。
 由此,复用线程的框架如下所示:

  1. 自定义ReuseThread类继承标准库的Thread,重写__init__()函数只为新增queue来存放新的函数及参数;
  2. 重写run()函数让线程运行起来后永不关闭、且能调用新加入queue的函数;
  3. 新增add_new_task()函数让外界线程访问,为执行run()函数的线程新增函数。
from threading import Thread
import queue# 继承Thread来开启线程,从而不用我们自己设计开启线程的方法
class ReuseThread(Thread):# 重写初始化方法,主要是要加上1个能储存新函数的队列,其他的直接复用Thread类def  __init__ (self) :super().__init__() # 复用Thread类的初始化函数self.queue = queue.Queue()   # 用队列queue新增其他函数 # 重写run方法,线程执行起来会默认调用该方法def run(self):while True: # 让执行run方法的线程的调用栈永不为空,从而线程能一直存在func,args,kwargs = self.queue.get() # 从队列获取函数、参数来执行# 如果队列里没东西,那么运行上一行代码的线程会阻塞,也就是该线程不会再被处理器执行# 直到队列里有东西为止线程才会解除阻塞,也就是竞争处理器来执行后续代码。func(*args,**kwargs) # 执行新增的函数def add_new_task(self,func,*args,**kwargs): # 外界线程调用该函数为执行run方法的线程新增任务self.queue.put((func,args,kwargs))   # 在队列queue里新增要执行函数、参数
# 在主线程里实例化复用线程,为它新增print函数。
t = ReuseThread()
t.start()
t.add_new_task(print,'任务1') # 主线程访问add_new_task(),为线程t新增要执行的print函数
t.add_new_task(print,'任务2')

运行结果如下图所示:

上述框架缺失让复用线程终止的条件、让主线程阻塞的条件,下面补上:

  1. 在上图的打印结果中,命令行里始终看不到python进程结束的标志。这是因为如果进程中还留有未销毁的普通线程,进程就不会结束。最佳的解决方法基于这样一个设计:如果进程中只有守护线程,那么进程会销毁所有线程,然后进程自己也退出。我们可以把复用线程标记为守护线程,如果主线程执行完毕,身为守护线程的复用线程会自动销毁,进程也会自动退出。
    ReuseThread__init__()方法末尾加入下面的一行,设置守护线程
self.daemon=True
  1. 让主线程阻塞是为了让主线程等待复用线程执行完函数。不然主线程一结束,身为守护线程的复用线程也会自动销毁,导致函数不会执行完。然而主线程调用t.join()方法会让主线程永久阻塞,在上面框架t.add_new_task(print,'任务2')的后续加上t.join()print('end')这两行,会发现print('end')永远不会被执行。
     什么原因?Thread类自带的.join()方法会阻塞调用它的线程,直至run()方法结束。由于复用线程的run()方法永远不会结束,那么被阻塞的线程会一直阻塞下去。
     怎么解决?重写join()方法,自己设定阻塞解除的条件。如果线程t执行完了队列queue的所有函数,那就应该让主线程解除阻塞。这点可以用queue自带的join()方法。调用queue.join()的线程,都会被阻塞,直到queue中所有任务执行完为止。在ReuseThread类里如下重写join()
# 外界线程调用这个函数会让外界线程阻塞,等待queue为空后,处理器才会去执行外界线程调用栈里的代码。
def  join (self) : self.queue.join()

将以上2点加入复用线程的框架,得到如下可复用的线程。除此之外,还在5处地点加入了打印线程名的print('Hook ...')方法,你能说对每个Hook对应的线程是什么吗?

import threading
import queueclass  ReuseThread (threading.Thread) : def  __init__ (self) :super().__init__() # 使用父类Thread的初始化函数self.queue = queue.Queue()   # 用队列queue新增其他函数 self.daemon = True  # 设置父类的全局变量daemon为true,说明该线程为守护线程# 如果进程中只有守护线程在运行,那么进程会结束,所有守护线程也会关闭def  run (self) :       # 线程一旦被处理器运行,会自动调用run()方法while  True : # 让该线程执行的函数不停止,即让调用栈不为空,从而线程不被销毁func,args,kwargs = self.queue.get()   # 获取函数、参数来执行print('Hook 1: %s'%threading.current_thread().getName()) # 看是哪个线程在执行该行代码          func(*args,**kwargs) self.queue.task_done()   # 告知队列取出的任务已完成# self.queue.task_done() 用于告诉self.queue.join()该任务已完成def  add_new_task (self, func,*args,**kwargs) :  # 外界线程访问这个函数为执行run方法的线程新增函数print('Hook 2: %s'%threading.current_thread().getName()) # 看是哪个线程在执行该行代码self.queue.put((func,args,kwargs))   # 在队列queue里新增函数、参数# 外界线程通过这个函数让外界线程阻塞,等待queue的任务都完成后,外界线程才能被处理器执行。def  join (self) : print('Hook 3: %s'%threading.current_thread().getName()) # 看是哪个线程在执行该行代码self.queue.join() # 由self.queue.task_done()告诉self.queue.join()是不是所有入队的任务都完成了。# 要给可复用的线程新增的函数
def  func (name) : print('Hook 4: %s'%threading.current_thread().getName()) # 看是哪个线程在执行该行代码print( name )if __name__ == '__main__' :print('Hook 5: %s'%threading.current_thread().getName()) # 看是哪个线程在执行该行代码t = ReuseThread()t.start()t.add_new_task(func,'任务 1')   t.add_new_task(func,'任务 2')   t.join()

预期结果如下图所示:

  1. 首先,两个任务都被同一个线程执行了(由2个Hook 4都对应Thread-1看出来,而Hook 4安插在func函数中)。
  2. 其次,只有Hook 1Hook 4安插的函数是由线程t执行(包括run()函数和func()函数),其他Hook安插的函数都由主线程MainThread执行(包括add_new_task(),join())。
  3. 最后,python进程会自动退出,这是由于我们把复用线程设置为了守护线程

2.3.2线程池的使用

 介绍了如何复用线程,线程池的存在就容易理解了,就是使用多个可复用的线程,自己可以实现,但标准库会实现得更好,所以直接用python的concurrent.futures.ThreadPoolExecutor类当线程池,调用api的操作如下:

  1. 创建线程池,设置复用线程的最大数量。
from concurrent.futures import ThreadPoolExecutor
workers=2
thread_pool = ThreadPoolExecutor(max_workers=workers) # max_workers指定了复用线程的最大数量
  1. 在主线程中用ThreadPoolExecutor.submit()提交任务给线程池,这不会阻塞主线程
# 延时打印id的任务
def thread_action(task_id,start_time):time.sleep(1)end_time=time.time()print('任务id:%d\t线程: %s\t完成时间: %d'%(task_id,threading.current_thread().getName(),end_time-float(start_time)))return task_id
thread_pool.submit(thread_action,task_id,start_time)
# submit()函数第一个参数为线程池要执行的函数名,其余的都是参数
  1. 在主线程中,用submit()得到Future对象,调用Future.result()获取函数return的东西,这会阻塞主线程。
future=thread_pool.submit(thread_action,task_id,start_time)
print(future.result())

把这3类操作合起来,展示一个具体实例操作:

from concurrent.futures import ThreadPoolExecutor
import concurrent
import time
import threadingworkers=2
thread_pool = ThreadPoolExecutor(max_workers=workers) # max_workers指定了复用线程的最大数量# 延时打印id的任务
def thread_action(task_id,start_time):time.sleep(1)end_time=time.time()print('任务id:%d\t线程: %s\t完成时间: %d'%(task_id,threading.current_thread().getName(),end_time-float(start_time)))return task_idtask_num=4 # 任务数量
future_list=[] # 放置submit()得到的Future对象
start_time=time.time()# submit()提交任务
for task_id in range(task_num):future=thread_pool.submit(thread_action,task_id,start_time)future_list.insert(0,future)# 获取函数return的结果
for future in concurrent.futures.as_completed(future_list):    print('返回的任务名: %d'%future.result())

运行结果如下图所示,总结两点:

  1. 有2个线程处理了4个任务,说明了线程的重用。
  2. future.result()的打印顺序和任务完成的顺序是一样的。注意:如果用for future in future_list遍历,结果将如下下图所示,最先打印的 future.result()反而是最晚完成的任务3,这是因为future_list的第1个元素是任务3的future,而future.result()会阻塞线程,所以主线程一直等到任务3有返回值才继续运行。但concurrent.futures.as_completed()会将最早完成的任务交给主线程,让主线程调用future.result()

3.多线程→\rightarrow→多进程

 用多线程可以实现任务的公平性、减少任务的总用时,为什么还要多进程?新建一个进程意味着新建一个python虚拟机、加载标准库函数到内存中,哪怕只是用于print(),都要付出这些成本。
 考虑多进程的动机在于python的CPython解释器很特殊:即便有很多CPU也不能并行地运行多个线程,只能交替地运行多线程,也就说CPython无法用多核CPU并行执行CPU bound任务。这是由于CPython为自己设置了一把互斥锁,所有要执行的线程必须获得这把锁(官方称为Global Interpreter Lock, GIL)。由于一个python进程只有一个解释器、一把解释器的锁,那么一个时刻只有一个线程能获得锁,所以一个时刻不可能有多个线程并行

 为什么CPython要有GIL这把锁?在于实现Python语言内在特性的线程安全,且不损失单线程运行的效率。具体内容很复杂,不多说。主要注意两点:
  GIL锁实现的是Python内在特性的线程安全。不是实现x+=1等非内在特性的线程安全。像list.append(x)等Python风格的操作就是线程安全的,不会由于多线程出现数据覆盖或缺失的问题,还有其他线程安全的常见操作见官方文档。
 使用GIL锁并不是实现内在特性线程安全的唯一方法,还可以像java那样不锁解释器,而是可以锁每个对象。但只用一把锁就实现内在特性的线程安全,相比对多个对象都加锁而言,能带来更高的运行效率。
 总而言之,轻易地实现内在特性的线程安全+较好的单线程运行效率,是继续保留GIL的主要原因。

CPythonGIL锁导致多核CPU无法并行处理多线程,一个解决方案就是用多进程。使用多个解释器,每个解释器及其GIL锁都在不同CPU上并行运行。

3.1进程池介绍

3.1.1复用进程

 复用线程的原理、api与复用进程极为相似,但有1处地方要变动:任务队列queue要从queue.Queue()变为multiprocess.Manager().Queue()。由于多个进程不共享堆内存,如果继续用queue.Queue(),主进程添加函数用到的queue和新进程执行函数用到的queue不是同一个队列。而multiprocess.Manager().Queue()做了特殊的处理,让它可以被多个进程共享数据。
复用进程的代码如下所示,和复用线程极为相似:

  1. 继承multiprocessing.Process类,帮我们创建新进程。
  2. 重写__init__()方法,将进程设置为守护进程,让该进程能自动结束,并接收一个多进程共享数据的队列queue
  3. 重写run()方法,用while True的方式让进程一直运行。
  4. 外界进程设置可以添加函数的add_new_task()和让外界进程阻塞的join()方法。
  5. 在主进程中创建multiprocess.Manager().Queue(),用于多进程共享数据。
import multiprocessing
import time
import os
#复用进程的Process类
class ReuseProcess(multiprocessing.Process):def __init__(self,queue):super().__init__()self.daemon=Trueself.queue=queuedef run(self):while True:func,args,kwargs=self.queue.get()print('Hoo1 进程名:%s'%(os.getpid()))func(*args,**kwargs)self.queue.task_done()def add_new_task(self,func,*args,**kwargs):print('Hook2 进程名:%s'%(os.getpid()))self.queue.put((func,args,kwargs))def join(self):print('Hook3 进程名:%s'%(os.getpid()))self.queue.join()
# 测试用的函数
def func(name):print('Hoo4 进程名:%s'%(os.getpid()))time.sleep(1)print(name)if __name__=='__main__':print('Hoo5 进程名:%s'%(os.getpid()))queue=multiprocessing.Manager().Queue()process=ReuseProcess(queue) # 新建可复用的进程process.start()process.add_new_task(func,'任务1') # 给进程添加任务并执行process.add_new_task(func,'任务2')process.join()process.kill()

无奖问答:在代码不同位置插入了打印进程号的print(Hook ...),你能说对每个Hook对应的进程吗?上述代码的运行结果如下图所示,有2点可以说:

  • 任务1和任务2都被执行了,进程可复用

  • 运行到Hook 2Hook 3Hook 5的进程是同一个(Hook 5说明这是主进程,Hook 2, Hook 3对应add_new_task()join()方法,印证了这两个方法是专门给主进程等外界进程执行的),运行到Hook 1Hook 4的进程又是另一个(就是新建的、用来执行任务的进程)。

     到此介绍完了复用进程的方法,但关于开启多进程和开启多线程的区别,还有一个需要深入理解的点:

  • 多线程过渡到多进程,好像就只改了个队列queue,让它被多个进程共享数据,似乎没有别的要注意。

  • 我们不妨先做一个实验,先把复用进程代码里的if __name__=='__main__'删掉,修改缩进后再运行,你会看到如下报错:

     报错信息显示... start a new process before the current process ...,难道new process指新开的进程,而current process指主进程?非也,current process指主进程新开的进程,而这个new process就是新进程想要再去创建的新新进程。报错原因在于新进程还没启动完全,这个新进程就又要去创建新新进程
     之所以新进程还会创建新新进程,在于创建进程的代码process=ReuseProcess(queue)也被新进程复制了一份予以执行。而if __name__=='__main__'代码块的作用就是只让主进程运行这个代码块。

  • 从中你可以发现,每创建一个新进程,它都会复制一份python代码来执行(因为多个进程一般不共享内存,除了复制代码来执行别无他法),所以任何只想让主进程执行的代码,务必要写在if __name__=='__main__'代码块中。

3.1.2进程池的使用

 知道复用进程的原理后,进程池的存在也显而易见,创建固定数量的进程,然后一直复用。python api操作包含3步:

  1. 使用multiprocessing.Pool()创建进程池
import multiprocessing
pool=multiprocessing.Pool(processes=2) # processes为进程数量
  1. 使用Pool.apply(func=, args= )提交阻塞任务func=后面填执行的函数名,args=后面填函数的参数,该方法返回函数return的东西。或者使用Pool.apply_async(func=, args= )提交异步任务,返回AsyncResult对象,再用AsyncResult.get()这种阻塞方法获取函数return的东西。阻塞任务就是指主进程提交了任务后,要等进程池执行完才继续运行后续代码;而异步任务就是主进程提交任务后会正常运行后续代码。如下是阻塞任务异步任务的示例。
# 同样是执行print('hello','world')
# 阻塞执行
res=pool.apply(func=print,args=('hello','world')) # 进程池执行完后主进程才会运行下一行,res为print('hello','world')的返回值,也就是None
# 异步执行
res=pool.apply_async(func=print,args=('hello','world')) # 主进程会立马运行下一行,不需要等进程池完成该函数
res.get() # 该阻塞方法会获取print('hello','world')的返回值,也就是None
  1. .join()让主进程等待进程池完成任务。如果没有.join(),在进程池完成任务前,主进程一旦运行完,会让全是守护进程的进程池立马结束所有进程,从而导致任务未完成。
pool.join()

以上3步操作合起来,展示如下的示例代码。让进程池执行延时打印函数process_action(),用pool.apply_async()添加异步任务

import multiprocessing
import time
import os def process_action(name,start_time):time.sleep(1)end_time=time.time()print('任务名: %s\t进程名: %s\t完成时间: %d'%(name,os.getpid(),end_time-start_time))if __name__=='__main__':pool=multiprocessing.Pool(processes=2)start_time=time.time()pool.apply_async(func=process_action,args=('任务 1',start_time))pool.apply_async(func=process_action,args=('任务 2',start_time))pool.apply_async(func=process_action,args=('任务 3',start_time))pool.apply_async(func=process_action,args=('任务 4',start_time))print('主进程打印这行要早于任务被完成')pool.close()pool.join()

运行结果如下所示,可以看出三点:

  1. 完成4个任务只需要2秒,有进程池实现了并行。
  2. 完成4个任务只用到了2个进程,进程池实现了进程复用。
  3. .apply_async()能实现异步执行,主进程执行print()早于任何任务的完成时间。

4.多进程+多线程

 多进程让CPU bound任务能被多核CPU并行执行,多线程既能减少I/O bound类任务的执行总用时、也维持了任务间的公平性。当两类任务都有时,将二者结合起来就有意义,否则只需要用其中一种。
 结合的方法是:创建一个进程池,其中每个进程都并行运行CPU bound任务,运行完后每个进程都用自己的线程池完成I/O bound任务。如下的代码示例包含2步:

  1. 为每个进程定义进程要执行的函数、线程池线程要执行的函数。这步在if __name__=='__main__':代码块之前要完成。
from multiprocessing import Pool # 进程池
from concurrent.futures import ThreadPoolExecutor # 线程池
import time
import os # 获取进程号
import threading # 获取线程号thread_workers=2 # 线程池的线程数量,每个进程有自己的线程池。
thread_executor = ThreadPoolExecutor(max_workers=thread_workers)
# 定义线程要执行的函数,线程阻塞1秒然后打印
def thread_action(task_id,start_time): time.sleep(1)end_time=time.time()print('进程号: %s\t线程号: %s\t任务号: %d\t完成时间: %d'%(os.getpid(),threading.current_thread().getName(),task_id,end_time-float(start_time)))
# 定义进程要执行的函数,其实就是调用线程来完成任务
def process_action(task_id,start_time):time.sleep(0.001) thread_executor.submit(thread_action,task_id,start_time) # 让线程池中的线程执行函数
  1. 主进程创建进程池,并在主进程中添加任务。这步在if __name__=='__main__':代码块之内才完成。
if __name__=='__main__':pool=Pool(processes=2) # 额外开启2个进程start_time=time.time() for i in range(8): # 将8次任务分配在2个进程上pool.apply_async(func=process_action,args=(i,start_time))pool.close() # 进程池不再接收新任务,只等旧任务都完成后就会关闭pool.join() # 让主进程等待进程池

拼凑在一起,就变成了如下代码:

from multiprocessing import Pool # 进程池
from concurrent.futures import ThreadPoolExecutor # 线程池
import time
import os # 获取进程号
import threading # 获取线程号thread_workers=2 # 线程池的线程数量,每个进程有自己的线程池。
thread_executor = ThreadPoolExecutor(max_workers=thread_workers)
# 定义线程要执行的函数,线程阻塞1秒然后打印
def thread_action(task_id,start_time): time.sleep(1)end_time=time.time()print('进程号: %s\t线程号: %s\t任务号: %d\t完成时间: %d'%(os.getpid(),threading.current_thread().getName(),task_id,end_time-float(start_time)))
# 定义进程要执行的函数,其实就是调用线程
def process_action(task_id,start_time):time.sleep(0.001) thread_executor.submit(thread_action,task_id,start_time) # 让线程池中的线程执行函数if __name__=='__main__':pool=Pool(processes=2) # 额外开启2个进程start_time=time.time() for i in range(8): # 将8次任务分配在2个进程上pool.apply_async(func=process_action,args=(i,start_time))pool.close() # 进程池不再接收新任务,只等旧任务都完成后就会关闭pool.join() # 让主进程等待进程池

运行结果如下图所示,有3点可以总结:

  1. 2个进程完成了8个任务,且每个进程都用了2个线程,进程池和线程池有作用。
  2. 运行时间方面,8个任务2秒完成,相比于单进程+单线程需要的8秒,这4倍的加速源于两点:2个进程的并行,每个进程中2个线程对I/O bound任务的加速,所以总共是2*2=4倍加速;但如果执行的完全是CPU bound任务,就只有2个进程的并行能带来2倍加速。
  3. 对于这8个I/O bound任务,只用1个单进程+4个线程也可以做到2秒完成,不需要多开进程来浪费资源。所以用多进程能有优势的前提是有较多CPU bound任务,否则单进程+多线程足以。

Python线程池、进程池的介绍与使用相关推荐

  1. python并发编程-进程池线程池-协程-I/O模型-04

    目录 进程池线程池的使用***** 进程池/线程池的创建和提交回调 验证复用池子里的线程或进程 异步回调机制 通过闭包给回调函数添加额外参数(扩展) 协程*** 概念回顾(协程这里再理一下) 如何实现 ...

  2. python线程池模块_Python并发编程之线程池/进程池--concurrent.futures模块

    一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/ ...

  3. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  4. Python并发编程之线程池/进程池

    引言 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我 ...

  5. Python爬虫:通信和线程池进程池

    通信和线程池进程池 线程间通信 导入线程队列 from queue import Queue import time from random import randint 同一个进程中的多个线程可以直 ...

  6. 并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

    1.线程queue :会有锁 q=queue.Queue(3) q.get() q.put() 先进先出 队列后进先出 堆栈优先级队列 1 """先进先出 队列" ...

  7. python笔记 7-8 进程池 进程通信 迭代器 消息队列 Queue 协程 和正则表达式

    day7 进程 进程池 进程通信 迭代器 消息队列 Queue 作用 用于多个进程间的通信 操作put放入消息(值) put_nowait() 放入值,不等待 如果队满,则报错 get获取消息(值) ...

  8. 进程、线程、进程池、进程三态、同步、异步、并发、并行、串行

    点击上方蓝色"方志朋",选择"设为星标"回复"666"获取独家整理的学习资料! 来源:cnblogs.com/songhaixing/p/1 ...

  9. 定时器 线程池\进程池

    定时器 可以指定线程多久后启动,(并且他是一个异步的线程,也就是并发) from threading import Timer """ 这个相当于开启了一个可以定义时间的 ...

  10. Python 多进程的进程池pool运行时报错:ValueError: Pool not running

    本文仅供学习交流使用,如侵立删!demo下载见文末 Python 多进程的进程池pool运行时报错:ValueError: Pool not running def main(self, num):& ...

最新文章

  1. 企业网络推广浅析外包企业网络推广如何有效布局关键词优化?
  2. 无法识别的配置节appsettings_人脸识别门禁考勤系统安装使用注意事项
  3. EFCore笔记之异步查询
  4. 求交错序列前N项和(15 分)
  5. java 字典 引用_java中数据字典的使用
  6. 加州“电力十足 ” iPhone12加速贬值成全“十三香”
  7. VMware Workstation 12.5 安装Mac OS X
  8. pdf虚拟打印机下载win7_闪电PDF虚拟打印机使用教程,超级简单的方法
  9. java实现数据库自动异地备份
  10. 怎样配置文件存储服务器,服务器搭建存储配置文件
  11. cve-2021-22205复现
  12. 牛电科技电动车 出行的最佳选择
  13. 【数论-Lucas定理】
  14. 忆阻蔡氏电路matlab,基于有源带通滤波器的忆阻蔡氏电路研究.doc
  15. Rootkit 真刀真枪的权限保卫战
  16. 【rdma_cm】rdma_cm API
  17. 警告关于测试人员的职场生存,千万要避开这5个坑(不看后悔)
  18. 红日靶场--内网渗透练习
  19. 多元线性回归(OLS+稳健误)python代码实现
  20. 国际四大索引系统的一点认识:SCI,EI,ISTP(CPCI),ISR

热门文章

  1. lol进入服务器后显示3秒白屏,win10系统玩lol游戏大厅变白屏解决办法介绍
  2. 微信纵剑仙界服务器选不了吗,纵剑仙界微信登录版
  3. 怎样绘制思维导图?讲述思维导图三招十八式
  4. 如何利用UGUI在Unity中实现一个本地排行榜
  5. 火力篮球游戏源码完整版--采用标准的游戏开发文档
  6. 软件测试面试合集,测试/测试开发岗面经,看完还不怕找不到工作
  7. python链接DB2数据库相关问题(离线安装,pandas读取等)
  8. 计算机实现加减乘除法的原理
  9. 用程序的思路,思考《坦克世界》中的插件的书写
  10. gm怎么刷东西 rust_变成GM 后 如何刷物品