concurrent模块

1、concurrent模块的介绍

concurrent.futures模块提供了高度封装的异步调用接口

ThreadPoolExecutor:线程池,提供异步调用

ProcessPoolExecutor:进程池,提供异步调用

ProcessPoolExecutor 和 ThreadPoolExecutor:两者都实现相同的接口,该接口由抽象Executor类定义。

2、基本方法

使用_base.Executor

concurrent.futures.thread.ThreadPoolExecutor #线程池

concurrent.futures.process.ProcessPoolExecutor #进程池#构造函数

def __init__(self, max_workers=None, mp_context=None,

initializer=None, initargs=()):

submit(fn, *args, **kwargs) :异步提交任务

使用submit函数来提交线程需要执行任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

map(func, *iterables, timeout=None, chunksize=1)

取代for循环submit的操作

shutdown(wait=True) :相当于进程池的pool.close()+pool.join()操作

wait=True,等待池内所有任务执行完毕回收完资源后才继续

wait=False,立即返回,并不会等待池内的任务执行完毕

但不管wait参数为何值,整个程序都会等到所有任务执行完毕

note:submit和map必须在shutdown之前

pool.submit()返回的对象是

concurrent.futures._base.Future类

add_done_callback(self,fn)

cancel(self)

cancelled(self)

done(self)

exception(self,timeout=None)

result(self,timeout=None)

running(self)

set_exception(self,exception)

set_result(self,result)

set_running_or_notify_cancel(self)

result(timeout=None) :取得结果,通过submit函数返回的任务句柄,使用result()方法可以获取任务的返回值,查看内部代码,发现这个方法是阻塞的

done()方法判断该任务是否结束

add_done_callback(fn) :回调函数

3、进程池和线程池

池的功能:限制进程数或线程数.

什么时候限制: 当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量 我就应该考虑去限制我进程数或线程数,从保证服务器不崩.

3.1 进程池

from concurrent.futures importProcessPoolExecutorimportosimporttimedeftask(i):print("第"+str(i)+"个在执行任务id:"+str(os.getpid()))

time.sleep(1)if __name__ == '__main__':

start=time.time()

pool= ProcessPoolExecutor(4) #进程池里又4个进程

for i in range(5): #5个任务

pool.submit(task,i)#进程池里当前执行的任务i,池子里的4个进程一次一次执行任务

pool.shutdown()print("耗时:",time.time()-start)

3.2 线程池

from concurrent.futures importThreadPoolExecutorfrom threading importcurrentThreadimporttimedeftask(i):print("第"+str(i)+"个在执行任务id:"+currentThread().name)

time.sleep(1)if __name__ == '__main__':

start=time.time()

pool= ThreadPoolExecutor(4) #进程池里又4个线程

for i in range(5): #5个任务

pool.submit(task,i)#线程池里当前执行的任务i,池子里的4个线程一次一次执行任务

pool.shutdown()print("耗时:",time.time()-start)

其他:done() 、 result()

通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束

使用result()方法可以获取任务的返回值,查看内部代码,发现这个方法是阻塞的

3.4列表+as_compelete模拟先进先出

对于线程,这样可以模拟执行与结果的先进先出。

但是对于进程会报错。

importtimefrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):

times=1time.sleep(times)print("第 NO.{i} get page {times} finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)

start=time.time()

executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #进程池会导致后面的all_task报错

all_task= [executor.submit(get_html,(i)) for i in range(5)]for future inas_completed(all_task):

data=future.result()print("in main:get page {} success".format(data))print('主进程结束--耗时',time.time()-start)

结果:

第 NO.0 get page 1finished

第 NO.1 get page 1finishedinmain:get page 第 NO.0 successin main:get page 第 NO.1success

第 NO.2 get page 1finishedin main:get page 第 NO.2success

第 NO.3 get page 1finishedin main:get page 第 NO.3success

第 NO.4 get page 1finishedin main:get page 第 NO.4success

主进程结束--耗时 3.0034666061401367

结果:

3.4 Map的用法

可以将多个任务一次性的提交给进程、线程池。---备注进程是也不行的,也会报错。

使用map方法,不需提前使用submit方法,map方法与python标准库中的map含义相同,都是将序列中的每个元素都执行同一个函数。

from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(i):print("第"+str(i)+"个在执行任务id:"+str(os.getpid()))

time.sleep(1)if __name__ == '__main__':

start=time.time()

pool=ProcessPoolExecutor(max_workers=3) #也可以换成ThreadPoolExecutor

pool.map(task,range(1,5)) #map取代了for+submit

pool.shutdown()print("耗时:",time.time()-start)

考虑到结果返回值:

importtimefrom random importrandomfrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):

times=1+random()/100time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)

start=time.time()

executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #进程池会导致后面的executor.map报错

res=executor.map(get_html, range(5))#for future in res: #直接返回结果,不需要get

print("in main:get page {} success".format(future))print('主进程结束--耗时',time.time()-start)

3.5 同步调用,顺序返回

因为我们在循环中每次循环都要调用或这说提交任务,并等待结果。所以其实进程之间是串行的。所以是同步的方式。

from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime

n= 1

deftask(i):globaln

time.sleep(1)print(f'{current_process().name} 在执行任务{i}')

n+=ireturn f'得到 {current_process().name} 任务{i} 的结果'

if __name__ == '__main__':

start=time.time()

pool= ProcessPoolExecutor(2) #进程池里又4个线程

pool_lis =[]for i in range(5): #20个任务

future = pool.submit(task,i)#进程池里当前执行的任务i,池子里的4个线程一次一次执行任务

pool_lis.append(future.result())#等待我执行任务得到的结果,如果一直没有结果,则阻塞。这里会导致我们所有任务编程了串行

#在这里就引出了下面的pool.shutdown()方法

pool.shutdown(wait=True) #关闭了池的入口,不允许在往里面添加任务了,会等带所有的任务执行完,结束阻塞

for res inpool_lis:print(res)print(n)#这里肯定是拿到0的

print("主进程---耗时",time.time()-start)#可以用join去解决,等待每一个进程结束后,拿到他的结果

结果:

SpawnProcess-2在执行任务0

SpawnProcess-1在执行任务1

SpawnProcess-2在执行任务2

SpawnProcess-1在执行任务3

SpawnProcess-2在执行任务4

得到 SpawnProcess-2任务0 的结果

得到 SpawnProcess-1任务1 的结果

得到 SpawnProcess-2任务2 的结果

得到 SpawnProcess-1任务3 的结果

得到 SpawnProcess-2任务4 的结果1主进程---耗时 5.575225830078125

同步--所以是串行的。耗时与单进程差不多

3.5 异步调用,顺序返回

from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime

n= 1

deftask(i):globaln

time.sleep(1)print(f'{current_process().name} 在执行任务{i}')

n+=ireturn f'得到 {current_process().name} 任务{i} 的结果'

if __name__ == '__main__':

start=time.time()

pool= ProcessPoolExecutor(2) #进程池里又4个线程

pool_lis =[]for i in range(5): #20个任务

future = pool.submit(task,i)#进程池里当前执行的任务i,池子里的4个线程一次一次执行任务

#print(future.result()) # 这是在等待我执行任务得到的结果,如果一直没有结果,这里会导致我们所有任务编程了串行

#在这里就引出了下面的pool.shutdown()方法

pool_lis.append(future)

pool.shutdown(wait=True) #关闭了池的入口,不允许在往里面添加任务了,会等带所有的任务执行完,结束阻塞

for p inpool_lis:print(p.result())print(n)#这里肯定是拿到0的

print("主进程---耗时",time.time()-start)#可以用join去解决,等待每一个进程结束后,拿到他的结果

结果:

SpawnProcess-1在执行任务0

SpawnProcess-2在执行任务1

SpawnProcess-1在执行任务2

SpawnProcess-2在执行任务3

SpawnProcess-1在执行任务4

得到 SpawnProcess-1任务0 的结果

得到 SpawnProcess-2任务1 的结果

得到 SpawnProcess-1任务2 的结果

得到 SpawnProcess-2任务3 的结果

得到 SpawnProcess-1任务4 的结果1主进程---耗时 3.2690603733062744

异步结果,有序返回相应结果

3.5 回调函数:

add_done_callback

from multiprocessing importcurrent_processimporttimefrom random importrandomfrom concurrent.futures importProcessPoolExecutordeftask(i):print(f'{current_process().name} 在执行{i}')

time.sleep(1+random())returni#parse 就是一个回调函数

defparse(future):#处理拿到的结果

print(f'{current_process().name} 拿到结果{future.result()} 结束了当前任务')if __name__ == '__main__':

start=time.time()

pool= ProcessPoolExecutor(2)for i in range(5):

future=pool.submit(task,i)'''给当前执行的任务绑定了一个函数,在当前任务结束的时候就会触发这个函数(称之为回调函数)

会把future对象作为参数传给函数

注:这个称为回调函数,当前任务处理结束了,就回来调parse这个函数'''future.add_done_callback(parse)#add_done_callback (parse) parse是一个回调函数

#add_done_callback () 是对象的一个绑定方法,他的参数就是一个函数

pool.shutdown()print('主线程耗时:',time.time()-start)

结果:

SpawnProcess-1在执行0

SpawnProcess-2在执行1

SpawnProcess-2在执行2

MainProcess 拿到结果1 结束了当前任务

SpawnProcess-1在执行3

MainProcess 拿到结果0 结束了当前任务

SpawnProcess-1在执行4

MainProcess 拿到结果3 结束了当前任务

MainProcess 拿到结果2 结束了当前任务

MainProcess 拿到结果4 结束了当前任务

主线程耗时:4.721129417419434

回调是主进程的,结果是无序的

3.6wait

wait方法可以让主线程阻塞,直到满足设定的要求。wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。

等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都借宿。

可以看到运行结果中,确实是所有任务都完成了,主线程才打印出main,等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待

from concurrent.futures importThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETEDimporttime#参数times用来模拟网络请求时间

from random importrandomdefget_html(i):

times=1+random()*10time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)

executor= ThreadPoolExecutor(max_workers=2)

urls= range(5)

all_task= [executor.submit(get_html,(url)) for url inurls]

wait(all_task,return_when=ALL_COMPLETED)print("main")

joblib模块

python多线程执行其他模块的文件_python并发编程--进程线程--其他模块-从菜鸟到老鸟(三)...相关推荐

  1. python 消息机制_Python并发编程之线程消息通信机制任务协调(四)

    . 前言 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举 ...

  2. python线程通信 消息传递_Python并发编程之线程消息通信机制/任务协调(四)

    大家好,并发编程进入第四篇. 本文目录 前言 Event事件 Condition Queue队列 总结 .前言 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非 ...

  3. python并发处理同一个文件_python并发编程(并发与并行,同步和异步,阻塞与非阻塞)...

    最近在学python的网络编程,学会了socket通信,并利用socket实现了一个具有用户验证功能,可以上传下载文件.可以实现命令行功能,创建和删除文件夹,可以实现的断点续传等功能的FTP服务器.但 ...

  4. python协程处理多个文件_python:多任务(线程、进程、协程)

    一.线程 1.创建线程 #创建线程 importthreading,timedeftask1():for i in range(5):print('task1 -- 任务:%s' %i) time.s ...

  5. python创建新进程_Python并发编程(进程的创建)

    动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的. 并发性:任何进程都可以同其他进程一起并发执行 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的 ...

  6. python3 线程隔离_Python并发编程之线程中的信息隔离(五)

    大家好,并发编程 进入第三篇. 上班第一天,大家应该比较忙吧.小明也是呢,所以今天的内容也很少.只要几分钟就能学完. 昨天我们说,线程与线程之间要通过消息通信来控制程序的执行. 讲完了消息通信,今天就 ...

  7. python批量执行多个py文件_Python实现批量执行同目录下的py文件方法

    Python版本:3.5 网上找了好多资料都没有直观的写出怎么批量执行,so,整理了一个小程序.最初是为了用Python进行单元测试,同目录下有两个unittest文件, AllTest.py的目的是 ...

  8. python线程池模块_Python并发编程之线程池/进程池--concurrent.futures模块

    一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/ ...

  9. python线程池模块_python并发编程之进程池,线程池,协程

    需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...

最新文章

  1. 三级结构_kegg pathway三级层级结构转对应表格
  2. 【CUDA7.5】MATLAB中配置Win7+Matlab R2015b+CUDA7.5+vs2013配置方法
  3. jca使用_使用JCA的密码学–提供者中的服务
  4. store_coding_state (cs_cm)的作用
  5. LeetCode 2186. 使两字符串互为字母异位词的最少步骤数
  6. 技术的价值--从实验到企业实施的关键性思想
  7. python自带模块连接数据库_Python使用sqlalchemy模块连接数据库操作示例
  8. windows环境上robotframework环境搭建
  9. 以一种访问权限不允许的方式做了一个访问套接字的尝试。
  10. 关于SRVINSTW与Kernel-Mode Driver Manager
  11. tplink虚拟服务器 tcp,关于TP-Link路由器端口映射详解
  12. 盘点 Java 线程池配置的常见误区
  13. 高中计算机 数制 教案,1.2.2 二进制与数制转换
  14. 嵌入式学习(二)之SoC芯片的开发流程
  15. 网赚项目分享:八条可以在线上做的副业兼职
  16. MFC下载网页简单实现
  17. 山东 计算机专业,山东省内计算机专业大学排名?
  18. 爬虫出现UnicodeEncodeError: ‘latin-1‘ codec can‘t encode character *** in position 8328: Body***问题
  19. 由序列确定二叉树:前序序列和中序序列构造二叉树 后序序列和中序序列构造二叉树 层次遍历序列和中序遍历序列构造二叉树 代码实现(c语言)
  20. Linux Cobbler批量装机

热门文章

  1. 11.1 问题描述及流程-机器学习笔记-斯坦福吴恩达教授
  2. linux下的各种系统错误errno描述一览
  3. 【笔记】Hexo+Github博客网站搭建,初试环境搭建及Matery主题配置感受
  4. netfilter与iptables表规则建立
  5. 如何用matlab分析chipscope的数据
  6. 15.verilog可综合语句设计综述
  7. 基于RBF和BP神经网络的信道估计算法的仿真与分析
  8. 实现键盘录入的第二种方式。。。。。
  9. Fedora 13 Alpha测试手记横空出世
  10. 【转】C++ const用法 尽可能使用const