Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。
From:https://www.cnblogs.com/weihengblog/p/9812110.html
concurrent.futures 官方文档:https://docs.python.org/3/library/concurrent.futures.html
concurrent.futures: 线程池, 让你更加高效, 并发的处理任务:https://www.h3399.cn/201906/703751.html
python 因为其全局解释器锁 GIL 而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,
IO 密集型 vs 计算密集型:
- IO密集型:读取文件,读取网络套接字频繁。
- 计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。
而 concurrent.futures 模块,可以利用 multiprocessing 实现真正的平行计算。
核心原理是:concurrent.futures 会以子进程的形式,平行的运行多个 python 解释器,从而令 python 程序可以利用多核 CPU 来提升执行速度。由于 子进程 与 主解释器 相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU 内核。
解释 2:concurrent.futures 中的 ProcessPoolExecutor类把工作分配给多个Python进程处理,因此,如果需要做CPU密集型处理,使用这个模块能绕开GIL,利用所有的CPU核心。
其原理是一个ProcessPoolExecutor创建了N个独立的Python解释器,N是系统上面可用的CPU核数。使用方法和ThreadPoolExecutor方法一样
Python:使用 Future、asyncio 处理并发
:https://blog.csdn.net/sinat_38682860/article/details/105419842
future 初始 -- 处理并发:https://www.cnblogs.com/zhaof/p/7679529.html
从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。
相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:
- 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
- 当一个线程完成的时候,主线程能够立即知道。
- 让多线程和多进程的编码接口一致。
Python 模块 - Concurrent.futures
从 Python3.2开始,Python 标准库提供了 concurrent.futures 模块,为开发人员提供了启动异步任务的高级接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的更高级的抽象,对编写 线程池/进程池 提供了直接的支持。 可以将相应的 tasks 直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future总结
1. python3自带,python2需要安装2. Executer对象它是一个抽象类,它提供了异步执行的方法,他不能直接使用,但可以通过它的子类ThreadPoolExecuter和ProcessPoolExecuter2.1 Executer.submit(fn, *args, **kwargs)fn: 需要异步执行的函数*args,**kwargs fn 接受的参数该方法的作用就是提交一个可执行的回调 task,它返回一个 Future 对象2.2 map(fn, *iterables, timeout=None, chunksize=1)map(task,URLS) # 返回一个 map()迭代器,这个迭代器中的回调执行返回的结果是有序的3. Future对象相关future可以理解为一个在未来完成的操作,这是异步编程的基础通常情况下我们在遇到IO操作的时候,将会发生阻塞,cpu不能做其他事情而future的引入帮助我们在这段等待时间可以完成其他的操作3.1 done():如果当前线程已取消/已成功,返回True。3.2 cance():如果当前线程正在执行,并且不能取消调用,返回Flase。否则调用取消,返回True3.3 running():如果当前的线程正在执行,则返回True3.4 result():返回调用返回的值,如果调用尚未完成,则此方法等待如果等待超时,会抛出concurrent.futures.TimeoutError如果没有指定超时时间,则等待无时间限制如果在完成之前,取消了Future,则会引发CancelledError4. as_completed():在多个Future实例上的迭代器将会被返回这些Future实例由fs完成时产生。由fs返回的任何重复的Future,都会被返回一次。里面保存的都是已经执行完成的Future对象5. wait():返回一个元祖,元祖包含两个元素1. 已完成的future集合2. 未完成的future集合
初体验:
# coding=utf-8
from concurrent import futures
from concurrent.futures import Future
import timedef return_future(msg):time.sleep(3)return msgpool = futures.ThreadPoolExecutor(max_workers=2)t1 = pool.submit(return_future,'hello')
t2 = pool.submit(return_future,'world')time.sleep(3)
print(t1.done()) # 如果顺利完成,则返回True
time.sleep(3)
print(t2.done())print(t1.result()) # 获取future的返回值
time.sleep(3)
print(t2.result())print("主线程")
map(func,* iterables,timeout = None,chunksize = 1 )
# coding=utf-8import time
from concurrent.futures import Future,as_completed
from concurrent.futures import ThreadPoolExecutor as Pool
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)pool = Pool()
result = pool.map(task,URLS)start_time = time.time()# 按照 URLS 的顺序返回
for res in result:print("{} {}".format(res.url,len(res.content)))# 无序的
with Pool(max_workers=3) as executer:future_task = [executer.submit(task,url) for url in URLS]for f in as_completed(future_task):if f.done():f_ret = f.result() # f.result()得到task的返回值,requests对象print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))print("耗时", time.time() - start_time)
print("主线程")
Future对象
Future可以理解为一个未来完成的操作
当我们执行io操作的时候,在等待返回结果之前会产生阻塞
cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他操作
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)# start_time = time.time()
# for url in URLS:
# ret = task(url)
# print("{} {}".format(ret.url,len(ret.content)))
# print("耗时",time.time() - start_time)
with Pool(max_workers=3) as executor:# 创建future任务future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)"""
url不是按照顺序返回的,说明并发时,当访问某一个url时,如果没有得到返回结果,不会发生阻塞
<Future at 0x1c63990e6d8 state=running> is running
<Future at 0x1c639922780 state=running> is running
<Future at 0x1c639922d30 state=running> is running
<Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
<Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101
<Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103
"""
模块方法
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait() 会返回一个tuple,tuple 会包含两个集合:已完成的集合 和 未完成的集合。使用 wait() 会获得更大的自由度,他接受三个参数:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETE。默认为 ALL_COMPLETE。
如果采用默认的 ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程:
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed, wait
import requestsURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url, timeout=10):r = requests.get(url=url, timeout=timeout)print(r.status_code)with Pool(max_workers=3) as execute:future_task = [execute.submit(task, url) for url in URLS]for f in future_task:if f.running():print("%s" % (str(f)))"""并且wait还有timeout和return_when两个参数return_when有三个常量 (默认是 ALL_COMPLETED)FIRST_COMPLETED 任何一个future_task执行完成时/取消时,改函数返回FIRST_EXCEPTION 任何一个future_task发生异常时,该函数返回,如果没有异常发生,等同于ALL_COMPLETED ALL_COMPLETED 当所有的future_task执行完毕返回。"""results = wait(future_task, return_when="FIRST_COMPLETED") #done = results[0]for d in done:print(d)
concurrent.futures.as_completed(fs, timeout=None)
在多个 Future 实例上的迭代器将会被返回,这些 Future 实例由 fs 完成时产生。由 fs 返回的任何重复的 Future,都会被返回一次。里面保存的都是已经执行完成的 Future 对象。
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)with Pool(max_workers=3) as executor:# 创建future任务future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)
下面我们将学习 concurrent.futures
模块中的类。concurrent.futures 基础模块是 executor 和 future。
使用示例代码:
# -*- coding:utf-8 -*-import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 减库存函数, 循环直到减库存完成
# 库存充足, 减库存成功, 返回True
# 库存不足, 减库存失败, 返回Falsedef reduce_stock():# python中redis事务是通过pipeline的封装实现的with r.pipeline() as pipe:while True:try:# watch库存键, multi后如果该key被其他客户端改变, 事务操作会抛出WatchError异常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0: # 有库存# 事务开始pipe.multi()pipe.decr('stock:count')# 把命令推送过去# execute返回命令执行结果列表, 这里只有一个decr返回当前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError异常, 观察被watch锁住的情况print(ex)pipe.unwatch()def worker():while True:# 没有库存就退出if not reduce_stock():breakif __name__ == "__main__":# 设置库存为100r.set("stock:count", 100)# 多进程模拟多个客户端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)
concurrent.futures 模块详解
1. Executor对象
class concurrent.futures.Executor
Executor 是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类 ThreadPoolExecutor 或者 ProcessPoolExecutor 进行调用。
1.1 Executor.submit(fn, *args, **kwargs)
fn:需要异步执行的函数
*args, **kwargs:fn 的参数
示例代码:
# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numwith futures.ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(test, 1)print(future.result())
线程池的基本使用
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池task1 = t.submit(spider, 1)task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中task3 = t.submit(spider, 3)print(f"task1: {task1.done()}") # 通过done来判断线程是否完成print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")time.sleep(2.5)print(f"task1: {task1.done()}")print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")print(task1.result()) # 通过result来获取返回值
使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。
使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。
通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。
使用 result() 方法可以获取任务的返回值。
import time
import random
from concurrent.futures import ThreadPoolExecutor, waitdef func_test(int_1, int_2):sleep_second = random.randint(int_1, int_2)print(f'睡眠时间 {sleep_second}')time.sleep(t)passdef main():with ThreadPoolExecutor(max_workers=100) as tp_executor:future_task = [tp_executor.submit(func_test, 2, 5) for _ in range(100)]wait(future_task)if __name__ == '__main__':main()pass
wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait 接受三个参数:
fs: 表示需要执行的序列
timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回
示例代码:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t:all_task = [t.submit(spider, page) for page in range(1, 5)]wait(all_task, return_when=FIRST_COMPLETED)print('finished')print(wait(all_task, timeout=2.5))
- 代码中返回的条件是:当完成第一个任务的时候,就停止等待,继续主线程任务
- 由于设置了延时, 可以看到最后只有 task4 还在运行中
as_completed
上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagedef main():with ThreadPoolExecutor(max_workers=5) as t:obj_list = []for page in range(1, 5):obj = t.submit(spider, page)obj_list.append(obj)for future in as_completed(obj_list):data = future.result()print(f"main: {data}")
as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。
当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时,先完成的任务会先返回给主线程
map(fn, *iterables, timeout=None)
fn: 第一个参数 fn 是需要线程执行的函数;
iterables:第二个参数接受一个可迭代对象;
timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,
但由于 map 是返回线程执行的结果,
如果 timeout小于线程执行时间会抛异常 TimeoutError。
用法如下:
import time
from concurrent.futures import ThreadPoolExecutordef spider(page):time.sleep(page)return pagestart = time.time()
executor = ThreadPoolExecutor(max_workers=4)i = 1
for result in executor.map(spider, [2, 3, 1, 4]):print("task{}:{}".format(i, result))i += 1
使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。
上面的代码对列表中的每个元素都执行 spider() 函数,并分配各线程池。
可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 1s 的任务先执行完成,也会先打印前面提交的任务返回的结果。
1.2 Executor.map(func, *iterables, timeout=None)
相当于map(func, *iterables),但是func是异步执行。timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。
func:需要异步执行的函数
*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
timeout:设置每次异步操作的超时时间
示例代码:
# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numdata = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:for future in executor.map(test, data):print(future)
1.3 Executor.shutdown(wait=True)
释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。
2. ThreadPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用.
class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 数目的线程池执行异步调用
python3标准库concurrent.futures
比原Thread封装更高,多线程concurrent.futures.ThreadPoolExecutor
,多进程concurrent.futures.ProcessPoolExecutor
利用concurrent.futures.Future
来进行各种便捷的数据交互,包括处理异常,都在result()中再次抛出。
示例代码:
import time
from concurrent import futures
from concurrent.futures import ThreadPoolExecutordef display(args):print(time.strftime('[%H:%M:%S]', time.localtime()), end=' ')print(args)def task(n):"""只是休眠"""display('begin sleep {}s.'.format(n))time.sleep(n)display('ended sleep {}s.'.format(n))def do_many_task_inorder():"""多线程按任务发布顺序依次等待完成"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞运行')for future in future_list:display(future)display('统一结束(有序)')for future in future_list:display(future.result())def do_many_task_disorder():"""多线程执行先完成先显示"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞运行')for future in future_list:display(future)display('统一结束(无序)')done_iter = futures.as_completed(future_list) # generatorfor done in done_iter:display(done)if __name__ == '__main__':do_many_task_inorder()do_many_task_disorder()
3. ProcessPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用.
class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。
示例代码:
# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numdef muti_exec(m, n):# m 并发次数# n 运行次数with futures.ProcessPoolExecutor(max_workers=m) as executor: # 多进程# with futures.ThreadPoolExecutor(max_workers=m) as executor: #多线程executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))for future in futures.as_completed(executor_dict):times = executor_dict[future]if future.exception() is not None:print('%r generated an exception: %s' % (times, future.exception()))else:print('RunTimes:%d,Res:%s' % (times, future.result()))if __name__ == '__main__':muti_exec(5, 1)
调度单个任务
执行者类Executor调度单个任务,使用submit() 函数,然后用返回的 Future 实例等待任务结果。
Executor 是一个 Python concurrent.futures
模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -
ThreadPoolExecutor:线程池
ProcessPoolExecutor:进程池
示例代码:
from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
future = executor.submit(task, 5)
print('future: {}'.format(future))
result = future.result()
print('result: {}'.format(result))
线程池 和 进程池
- ThreadPoolExecutor 是
Executor
类的具体子类之一。 子类使用多线程,我们得到一个提交任务的线程池。 该池将任务分配给可用线程并安排它们运行。 - ProcessPoolExecutor 是
Executor
类的具体子类之一。 它使用多重处理,并且我们获得提交任务的进程池。 此池将任务分配给可用的进程并安排它们运行。
如何创建一个 ThreadPoolExecutor 或者 ProcessPoolExecutor?
在concurrent.futures
模块及其具体子类Executor
的帮助下,可以很容易地创建一个线程池或者进程池。 需要使用我们想要的池中的线程数构造一个ThreadPoolExecutor 或者 ProcessPoolExecutor
。 默认情况下,数字是5
。然后可以提交一个任务到线程池或者进程池。 当submit()
任务时,会返回Future
对象。 Future
对象有一个名为done()
的方法,它告诉Future
是否已经解决。 有了这个,为这个特定的Future
对象设定了一个值。 当任务完成时,线程池执行器将该值设置为Future
的对象。
线程池 示例代码:
from concurrent.futures import ThreadPoolExecutor
from time import sleepdef task(message):sleep(2)return messagedef main():executor = ThreadPoolExecutor(5)future = executor.submit(task, "Completed")print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()
结果截图:
在上面的例子中,一个ThreadPoolExecutor
已经由5个线程构造而成。 然后,在提供消息之前等待2秒的任务被提交给线程池执行器。 从输出中可以看出,任务直到2
秒才完成,所以第一次调用done()
将返回False
。 2
秒后,任务完成,我们通过调用result()
方法得到future
的结果。
进程池 示例代码:
from concurrent.futures import ProcessPoolExecutor
from time import sleepdef task(message):sleep(2)return messagedef main():executor = ProcessPoolExecutor(5)future = executor.submit(task, ("Completed"))print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()
实例化ThreadPoolExecutor 或者 ProcessPoolExecutor 之 上下文管理器
另一种实例化ThreadPoolExecutor
的方法是在上下文管理器的帮助下完成的。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来不错。 实例化可以在下面的代码的帮助下完成
with ThreadPoolExecutor(max_workers = 5) as executor
或者
with ProcessPoolExecutor(max_workers = 5) as executor
示例
以下示例是从 Python 文档借用的。 在这个例子中,首先必须导入 concurrent.futures
模块。 然后创建一个名为 load_url()
的函数,它将加载请求的url。 然后该函数用池中的5
个线程创建 ThreadPoolExecutor
。 ThreadPoolExecutor
已被用作上下文管理器。 我们可以通过调用 result()
方法来获得 future
的结果。
import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','https://www.yiibai.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/'
]def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))
以下将是上面的Python脚本的输出 -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.yiibai.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes
进程池:
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()def main():with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))if __name__ == '__main__':main()
使用 map() 调度多任务,有序返回
使用map(),多个worker并发地从输入迭代器里取数据,处理,然后按顺序返回结果。
示例代码:
from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(task, range(1, 10))
print('unprocessed results: {}'.format(results))
real_results = list(results)
print('real results: {}'.format(real_results))
使用 Executor.map() 函数
Python map()
函数广泛用于许多任务。 一个这样的任务是对可迭代内的每个元素应用某个函数。 同样,可以将迭代器的所有元素映射到一个函数,并将这些作为独立作业提交到ThreadPoolExecutor
之外。 考虑下面的Python脚本示例来理解函数的工作原理。
示例
在下面的示例中,map
函数用于将square()
函数应用于values
数组中的每个值。
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()
以下将是上面的Python脚本的输出 :
进程池:
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ProcessPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()
多任务调度,无序返回
不断将任务submit到executor,返回future列表,使用as_completed无序产生每个任务的结果。
示例代码:
from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
future_list = [executor.submit(task, i) for i in range(1, 10)]
for f in futures.as_completed(future_list):print(f.result())
何时使用ProcessPoolExecutor 和 ThreadPoolExecutor ?
现在我们已经学习了两个Executor
类 - ThreadPoolExecutor
和ProcessPoolExecutor
,我们需要知道何时使用哪个执行器。需要在受CPU限制的工作负载情况下选择ProcessPoolExecutor
,而在受I/O限制的工作负载情况下则需要选择ThreadPoolExecutor
。
如果使用ProcessPoolExecutor
,那么不需要担心GIL,因为它使用多处理。 而且,与ThreadPoolExecution
相比,执行时间会更少。
Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。相关推荐
- python线程池模块_Python并发编程之线程池/进程池--concurrent.futures模块
一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/ ...
- 《转载》Python并发编程之线程池/进程池--concurrent.futures模块
本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...
- Python 中 concurrent.futures 模块使用说明
Python 中 concurrent.futures 模块使用说明 转载请注明出处:https://blog.csdn.net/jpch89/article/details/87643972 文章目 ...
- concurrent.futures模块(进程池线程池)
1.线程池的概念 由于python中的GIL导致每个进程一次只能运行一个线程,在I/O密集型的操作中可以开启多线程,但是在使用多线程处理任务时候,不是线程越多越好,因为在线程切换的时候,需要切换上下文 ...
- python实现多线程的三种方法threading.Thread(模块)的继承实现和函数实现;以及concurrent.futures模块的线程池实现
1.threading.Thread模块继承实现: import threading import timeclass TestThread(threading.Thread):def __init_ ...
- concurrent.futures模块使用
一.概念总结 1-1 池:控制进程数或线程数的概念. 服务器开启的进程数或线程数,会随并发的客户端数目单调递增.会产生巨大的压力于服务器,于是使用"池"的概念,对服务端开启的进程数 ...
- python paramiko并发_使用Python paramiko模块利用多线程实现ssh并发执行操作
1.paramiko概述 ssh是一个协议,OpenSSH是其中一个开源实现,paramiko是Python的一个库,实现了SSHv2协议(底层使用cryptography). 有了Paramiko以 ...
- 深入理解Java虚拟机-如何利用VisualVM对高并发项目进行性能分析
Java虚拟机深入理解系列全部文章更新中- 深入理解Java虚拟机-Java内存区域透彻分析 深入理解Java虚拟机-常用vm参数分析 深入理解Java虚拟机-JVM内存分配与回收策略原理,从此告别J ...
- python 并行计算 并行方法总结 concurrent.futures pp pathos multiprocessing multiprocess模块 总结对比
目录 模块介绍文章 相近模块 1.按并行分类 (1)阻塞(非并行) (2)批次并行 (3)异步 2.按传参分类 (1)单个任务,任务多参数 (2)多个任务,任务单参数 (3)多个任务,任务多参数 3. ...
最新文章
- 重磅剧透!阿里巴巴计划开源 Nacos,为Dubbo生态发展铺路
- java 防渗透_「java、工程师工作经验怎么写」-看准网
- Linux 免密码sudo
- spring jms 消息_Spring JMS,消息自动转换,JMS模板
- linux上TCP connection timeout的原因查找
- Netty学习总结(1)——Netty入门介绍
- STM32工作笔记0047--认识DTU什么是4GDTU设备
- 计算机图形学----投影矩阵
- MongoDB学习笔记~官方驱动的原生Curd操作
- 中点和中值滤波的区别_滤波器知识总结:详解滤波器分类、技术参数及部分种类介绍...
- Dart 语言基础分步指南
- Dialogs(对话框)
- (OK) 股市财经博客参考!
- mysql 本周、上周、本月、上月SQL 语句
- pdf生成目录-如何给没有目录的pdf手动添加目录
- 速围观!上千款“AI黑科技”在此集结
- h3c无线认证服务器,H3C无线路由器配置样例之带认证接入
- 这部豆瓣评分 9.4 的古董沙雕剧和 Python ...
- Linux使用Backspace(消除键)键时出现^H 解决方法
- Android 黑马Topline《企业级项目实战教程》2022
热门文章
- Sigmoid函数与Softmax函数的区别与联系
- Logan:美团点评的开源移动端基础日志库
- 美团大脑:知识图谱的建模方法及其应用
- 大数据技术和python开发工程师
- 国科大prml-往年习题
- 基于对抗生成网络的滚动轴承故障检测方法
- com.sun.jersey.api.client.UniformInterfaceException:returned a response status of 403
- 洛谷P1558 色板游戏
- jsp内置对象(四)-----session对象
- 在XML里的XSD和DTD以及standalone的使用