From:https://www.cnblogs.com/weihengblog/p/9812110.html

concurrent.futures 官方文档:https://docs.python.org/3/library/concurrent.futures.html

concurrent.futures: 线程池, 让你更加高效, 并发的处理任务:https://www.h3399.cn/201906/703751.html

python 因为其全局解释器锁 GIL 而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,

IO 密集型 vs 计算密集型:

  1. IO密集型:读取文件,读取网络套接字频繁。
  2. 计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。

而 concurrent.futures 模块,可以利用 multiprocessing 实现真正的平行计算。

核心原理是:concurrent.futures 会以子进程的形式,平行的运行多个 python 解释器,从而令 python 程序可以利用多核 CPU 来提升执行速度。由于 子进程 与 主解释器 相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU 内核。

解释 2:concurrent.futures 中的 ProcessPoolExecutor类把工作分配给多个Python进程处理,因此,如果需要做CPU密集型处理,使用这个模块能绕开GIL,利用所有的CPU核心。
其原理是一个ProcessPoolExecutor创建了N个独立的Python解释器,N是系统上面可用的CPU核数。使用方法和ThreadPoolExecutor方法一样

Python:使用 Future、asyncio 处理并发

:https://blog.csdn.net/sinat_38682860/article/details/105419842

future 初始 -- 处理并发:https://www.cnblogs.com/zhaof/p/7679529.html

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。

相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。

Python 模块 - Concurrent.futures

从 Python3.2开始,Python 标准库提供了 concurrent.futures 模块,为开发人员提供了启动异步任务的高级接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的更高级的抽象,对编写 线程池/进程池 提供了直接的支持。 可以将相应的 tasks 直接放入线程池/进程池,不需要维护Queue来操心死锁的问题线程池/进程池会自动帮我们调度。

Future总结

1. python3自带,python2需要安装2. Executer对象它是一个抽象类,它提供了异步执行的方法,他不能直接使用,但可以通过它的子类ThreadPoolExecuter和ProcessPoolExecuter2.1 Executer.submit(fn, *args, **kwargs)fn:             需要异步执行的函数*args,**kwargs  fn 接受的参数该方法的作用就是提交一个可执行的回调 task,它返回一个 Future 对象2.2 map(fn, *iterables, timeout=None, chunksize=1)map(task,URLS)  # 返回一个 map()迭代器,这个迭代器中的回调执行返回的结果是有序的3. Future对象相关future可以理解为一个在未来完成的操作,这是异步编程的基础通常情况下我们在遇到IO操作的时候,将会发生阻塞,cpu不能做其他事情而future的引入帮助我们在这段等待时间可以完成其他的操作3.1 done():如果当前线程已取消/已成功,返回True。3.2 cance():如果当前线程正在执行,并且不能取消调用,返回Flase。否则调用取消,返回True3.3 running():如果当前的线程正在执行,则返回True3.4 result():返回调用返回的值,如果调用尚未完成,则此方法等待如果等待超时,会抛出concurrent.futures.TimeoutError如果没有指定超时时间,则等待无时间限制如果在完成之前,取消了Future,则会引发CancelledError4. as_completed():在多个Future实例上的迭代器将会被返回这些Future实例由fs完成时产生。由fs返回的任何重复的Future,都会被返回一次。里面保存的都是已经执行完成的Future对象5. wait():返回一个元祖,元祖包含两个元素1. 已完成的future集合2. 未完成的future集合

初体验:

# coding=utf-8
from concurrent import futures
from concurrent.futures import Future
import timedef return_future(msg):time.sleep(3)return msgpool = futures.ThreadPoolExecutor(max_workers=2)t1 = pool.submit(return_future,'hello')
t2 = pool.submit(return_future,'world')time.sleep(3)
print(t1.done())  # 如果顺利完成,则返回True
time.sleep(3)
print(t2.done())print(t1.result()) # 获取future的返回值
time.sleep(3)
print(t2.result())print("主线程")

map(func,* iterables,timeout = None,chunksize = 1 )

# coding=utf-8import time
from concurrent.futures import Future,as_completed
from concurrent.futures import ThreadPoolExecutor as Pool
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)pool = Pool()
result = pool.map(task,URLS)start_time = time.time()# 按照 URLS 的顺序返回
for res in result:print("{} {}".format(res.url,len(res.content)))# 无序的
with Pool(max_workers=3) as executer:future_task = [executer.submit(task,url) for url in URLS]for f in as_completed(future_task):if f.done():f_ret = f.result() # f.result()得到task的返回值,requests对象print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))print("耗时", time.time() - start_time)
print("主线程")

Future对象

Future可以理解为一个未来完成的操作
当我们执行io操作的时候,在等待返回结果之前会产生阻塞
cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他操作

from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)# start_time = time.time()
# for url in URLS:
#     ret = task(url)
#     print("{} {}".format(ret.url,len(ret.content)))
# print("耗时",time.time() - start_time)
with Pool(max_workers=3) as executor:# 创建future任务future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)"""
url不是按照顺序返回的,说明并发时,当访问某一个url时,如果没有得到返回结果,不会发生阻塞
<Future at 0x1c63990e6d8 state=running> is running
<Future at 0x1c639922780 state=running> is running
<Future at 0x1c639922d30 state=running> is running
<Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
<Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101
<Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103
"""

模块方法

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait() 会返回一个tuple,tuple 会包含两个集合:已完成的集合 和 未完成的集合。使用 wait() 会获得更大的自由度,他接受三个参数:FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETE默认为 ALL_COMPLETE

如果采用默认的 ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程:

from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed, wait
import requestsURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url, timeout=10):r = requests.get(url=url, timeout=timeout)print(r.status_code)with Pool(max_workers=3) as execute:future_task = [execute.submit(task, url) for url in URLS]for f in future_task:if f.running():print("%s" % (str(f)))"""并且wait还有timeout和return_when两个参数return_when有三个常量 (默认是 ALL_COMPLETED)FIRST_COMPLETED 任何一个future_task执行完成时/取消时,改函数返回FIRST_EXCEPTION 任何一个future_task发生异常时,该函数返回,如果没有异常发生,等同于ALL_COMPLETED    ALL_COMPLETED 当所有的future_task执行完毕返回。"""results = wait(future_task, return_when="FIRST_COMPLETED")  #done = results[0]for d in done:print(d)

concurrent.futures.as_completed(fs, timeout=None)

在多个 Future 实例上的迭代器将会被返回,这些 Future 实例由 fs 完成时产生。由 fs 返回的任何重复的 Future,都会被返回一次。里面保存的都是已经执行完成的 Future 对象。

from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)with Pool(max_workers=3) as executor:# 创建future任务future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)

下面我们将学习 concurrent.futures 模块中的类。concurrent.futures 基础模块是 executor 和 future。

使用示例代码:

# -*- coding:utf-8 -*-import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 减库存函数, 循环直到减库存完成
# 库存充足, 减库存成功, 返回True
# 库存不足, 减库存失败, 返回Falsedef reduce_stock():# python中redis事务是通过pipeline的封装实现的with r.pipeline() as pipe:while True:try:# watch库存键, multi后如果该key被其他客户端改变, 事务操作会抛出WatchError异常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0:  # 有库存# 事务开始pipe.multi()pipe.decr('stock:count')# 把命令推送过去# execute返回命令执行结果列表, 这里只有一个decr返回当前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError异常, 观察被watch锁住的情况print(ex)pipe.unwatch()def worker():while True:# 没有库存就退出if not reduce_stock():breakif __name__ == "__main__":# 设置库存为100r.set("stock:count", 100)# 多进程模拟多个客户端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)

concurrent.futures 模块详解

1. Executor对象

class concurrent.futures.Executor

Executor 是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类 ThreadPoolExecutor 或者 ProcessPoolExecutor 进行调用。

1.1 Executor.submit(fn, *args, **kwargs)

fn:需要异步执行的函数
*args, **kwargs:fn 的参数

示例代码:

# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numwith futures.ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(test, 1)print(future.result())

线程池的基本使用

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池task1 = t.submit(spider, 1)task2 = t.submit(spider, 2)  # 通过submit提交执行的函数到线程池中task3 = t.submit(spider, 3)print(f"task1: {task1.done()}")  # 通过done来判断线程是否完成print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")time.sleep(2.5)print(f"task1: {task1.done()}")print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")print(task1.result())  # 通过result来获取返回值
  1. 使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

  2. 使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

  3. 通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。

  4. 使用 result() 方法可以获取任务的返回值。

import time
import random
from concurrent.futures import ThreadPoolExecutor, waitdef func_test(int_1, int_2):sleep_second = random.randint(int_1, int_2)print(f'睡眠时间 {sleep_second}')time.sleep(t)passdef main():with ThreadPoolExecutor(max_workers=100) as tp_executor:future_task = [tp_executor.submit(func_test, 2, 5) for _ in range(100)]wait(future_task)if __name__ == '__main__':main()pass

wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait 接受三个参数:
        fs: 表示需要执行的序列
        timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
        return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回

示例代码:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t:all_task = [t.submit(spider, page) for page in range(1, 5)]wait(all_task, return_when=FIRST_COMPLETED)print('finished')print(wait(all_task, timeout=2.5))
  1. 代码中返回的条件是:当完成第一个任务的时候,就停止等待,继续主线程任务
  2. 由于设置了延时, 可以看到最后只有 task4 还在运行中

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagedef main():with ThreadPoolExecutor(max_workers=5) as t:obj_list = []for page in range(1, 5):obj = t.submit(spider, page)obj_list.append(obj)for future in as_completed(obj_list):data = future.result()print(f"main: {data}")

as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。

当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时,先完成的任务会先返回给主线程

map(fn, *iterables, timeout=None)
    fn: 第一个参数 fn 是需要线程执行的函数;
    iterables:第二个参数接受一个可迭代对象;
    timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,
              但由于 map 是返回线程执行的结果,
              如果 timeout小于线程执行时间会抛异常 TimeoutError。

用法如下:

import time
from concurrent.futures import ThreadPoolExecutordef spider(page):time.sleep(page)return pagestart = time.time()
executor = ThreadPoolExecutor(max_workers=4)i = 1
for result in executor.map(spider, [2, 3, 1, 4]):print("task{}:{}".format(i, result))i += 1

使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。

上面的代码对列表中的每个元素都执行 spider() 函数,并分配各线程池。

可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 1s 的任务先执行完成,也会先打印前面提交的任务返回的结果。

1.2 Executor.map(func, *iterables, timeout=None)

相当于map(func, *iterables),但是func是异步执行。timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。

func:需要异步执行的函数
*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
timeout:设置每次异步操作的超时时间

示例代码:

# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numdata = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:for future in executor.map(test, data):print(future)

1.3 Executor.shutdown(wait=True)

释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法

2. ThreadPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用.

class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 数目的线程池执行异步调用

python3标准库concurrent.futures比原Thread封装更高,多线程concurrent.futures.ThreadPoolExecutor,多进程concurrent.futures.ProcessPoolExecutor
利用concurrent.futures.Future来进行各种便捷的数据交互,包括处理异常,都在result()中再次抛出。

示例代码:

import time
from concurrent import futures
from concurrent.futures import ThreadPoolExecutordef display(args):print(time.strftime('[%H:%M:%S]', time.localtime()), end=' ')print(args)def task(n):"""只是休眠"""display('begin sleep {}s.'.format(n))time.sleep(n)display('ended sleep {}s.'.format(n))def do_many_task_inorder():"""多线程按任务发布顺序依次等待完成"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞运行')for future in future_list:display(future)display('统一结束(有序)')for future in future_list:display(future.result())def do_many_task_disorder():"""多线程执行先完成先显示"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞运行')for future in future_list:display(future)display('统一结束(无序)')done_iter = futures.as_completed(future_list)  # generatorfor done in done_iter:display(done)if __name__ == '__main__':do_many_task_inorder()do_many_task_disorder()

3. ProcessPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用.

class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。

示例代码:

# -*- coding:utf-8 -*-
from concurrent import futuresdef test(num):import timereturn time.ctime(), numdef muti_exec(m, n):# m 并发次数# n 运行次数with futures.ProcessPoolExecutor(max_workers=m) as executor:  # 多进程# with futures.ThreadPoolExecutor(max_workers=m) as executor: #多线程executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))for future in futures.as_completed(executor_dict):times = executor_dict[future]if future.exception() is not None:print('%r generated an exception: %s' % (times, future.exception()))else:print('RunTimes:%d,Res:%s' % (times, future.result()))if __name__ == '__main__':muti_exec(5, 1)

调度单个任务

执行者类Executor调度单个任务,使用submit() 函数,然后用返回的 Future 实例等待任务结果。

Executor 是一个 Python concurrent.futures 模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -

  • ThreadPoolExecutor:线程池
  • ProcessPoolExecutor:进程池

示例代码:

from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
future = executor.submit(task, 5)
print('future: {}'.format(future))
result = future.result()
print('result: {}'.format(result))

线程池 和 进程池

  1. ThreadPoolExecutor 是 Executor类的具体子类之一。 子类使用多线程,我们得到一个提交任务的线程池。 该池将任务分配给可用线程并安排它们运行。
  2. ProcessPoolExecutorExecutor类的具体子类之一。 它使用多重处理,并且我们获得提交任务的进程池。 此池将任务分配给可用的进程并安排它们运行。

如何创建一个 ThreadPoolExecutor 或者 ProcessPoolExecutor?

concurrent.futures模块及其具体子类Executor的帮助下,可以很容易地创建一个线程池或者进程池。 需要使用我们想要的池中的线程数构造一个ThreadPoolExecutor 或者 ProcessPoolExecutor。 默认情况下,数字是5。然后可以提交一个任务到线程池或者进程池。 当submit()任务时,会返回Future对象。 Future对象有一个名为done()的方法,它告诉Future是否已经解决。 有了这个,为这个特定的Future对象设定了一个值。 当任务完成时,线程池执行器将该值设置为Future的对象。

线程池 示例代码:

from concurrent.futures import ThreadPoolExecutor
from time import sleepdef task(message):sleep(2)return messagedef main():executor = ThreadPoolExecutor(5)future = executor.submit(task, "Completed")print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()

结果截图:

在上面的例子中,一个ThreadPoolExecutor已经由5个线程构造而成。 然后,在提供消息之前等待2秒的任务被提交给线程池执行器。 从输出中可以看出,任务直到2秒才完成,所以第一次调用done()将返回False2秒后,任务完成,我们通过调用result()方法得到future的结果。

进程池 示例代码:

from concurrent.futures import ProcessPoolExecutor
from time import sleepdef task(message):sleep(2)return messagedef main():executor = ProcessPoolExecutor(5)future = executor.submit(task, ("Completed"))print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()

实例化ThreadPoolExecutor 或者 ProcessPoolExecutor  之 上下文管理器
另一种实例化ThreadPoolExecutor的方法是在上下文管理器的帮助下完成的。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来不错。 实例化可以在下面的代码的帮助下完成

with ThreadPoolExecutor(max_workers = 5) as executor
或者
with ProcessPoolExecutor(max_workers = 5) as executor

示例

以下示例是从 Python 文档借用的。 在这个例子中,首先必须导入 concurrent.futures 模块。 然后创建一个名为 load_url()的函数,它将加载请求的url。 然后该函数用池中的5个线程创建 ThreadPoolExecutorThreadPoolExecutor 已被用作上下文管理器。 我们可以通过调用 result()方法来获得 future的结果。

import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','https://www.yiibai.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/'
]def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

以下将是上面的Python脚本的输出 -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.yiibai.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

进程池:

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()def main():with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))if __name__ == '__main__':main()

使用 map() 调度多任务,有序返回

使用map(),多个worker并发地从输入迭代器里取数据,处理,然后按顺序返回结果。

示例代码:

from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(task, range(1, 10))
print('unprocessed results: {}'.format(results))
real_results = list(results)
print('real results: {}'.format(real_results))

使用 Executor.map() 函数

Python map()函数广泛用于许多任务。 一个这样的任务是对可迭代内的每个元素应用某个函数。 同样,可以将迭代器的所有元素映射到一个函数,并将这些作为独立作业提交到ThreadPoolExecutor之外。 考虑下面的Python脚本示例来理解函数的工作原理。

示例
在下面的示例中,map函数用于将square()函数应用于values数组中的每个值。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()

以下将是上面的Python脚本的输出 :

进程池:

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ProcessPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()

多任务调度,无序返回

不断将任务submit到executor,返回future列表,使用as_completed无序产生每个任务的结果。

示例代码:

from concurrent import futures
import time
import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3)
future_list = [executor.submit(task, i) for i in range(1, 10)]
for f in futures.as_completed(future_list):print(f.result())

何时使用ProcessPoolExecutor 和 ThreadPoolExecutor ?

现在我们已经学习了两个Executor类 - ThreadPoolExecutorProcessPoolExecutor,我们需要知道何时使用哪个执行器。需要在受CPU限制的工作负载情况下选择ProcessPoolExecutor,而在受I/O限制的工作负载情况下则需要选择ThreadPoolExecutor

如果使用ProcessPoolExecutor,那么不需要担心GIL,因为它使用多处理。 而且,与ThreadPoolExecution相比,执行时间会更少。

Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。相关推荐

  1. python线程池模块_Python并发编程之线程池/进程池--concurrent.futures模块

    一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/ ...

  2. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  3. Python 中 concurrent.futures 模块使用说明

    Python 中 concurrent.futures 模块使用说明 转载请注明出处:https://blog.csdn.net/jpch89/article/details/87643972 文章目 ...

  4. concurrent.futures模块(进程池线程池)

    1.线程池的概念 由于python中的GIL导致每个进程一次只能运行一个线程,在I/O密集型的操作中可以开启多线程,但是在使用多线程处理任务时候,不是线程越多越好,因为在线程切换的时候,需要切换上下文 ...

  5. python实现多线程的三种方法threading.Thread(模块)的继承实现和函数实现;以及concurrent.futures模块的线程池实现

    1.threading.Thread模块继承实现: import threading import timeclass TestThread(threading.Thread):def __init_ ...

  6. concurrent.futures模块使用

    一.概念总结 1-1 池:控制进程数或线程数的概念. 服务器开启的进程数或线程数,会随并发的客户端数目单调递增.会产生巨大的压力于服务器,于是使用"池"的概念,对服务端开启的进程数 ...

  7. python paramiko并发_使用Python paramiko模块利用多线程实现ssh并发执行操作

    1.paramiko概述 ssh是一个协议,OpenSSH是其中一个开源实现,paramiko是Python的一个库,实现了SSHv2协议(底层使用cryptography). 有了Paramiko以 ...

  8. 深入理解Java虚拟机-如何利用VisualVM对高并发项目进行性能分析

    Java虚拟机深入理解系列全部文章更新中- 深入理解Java虚拟机-Java内存区域透彻分析 深入理解Java虚拟机-常用vm参数分析 深入理解Java虚拟机-JVM内存分配与回收策略原理,从此告别J ...

  9. python 并行计算 并行方法总结 concurrent.futures pp pathos multiprocessing multiprocess模块 总结对比

    目录 模块介绍文章 相近模块 1.按并行分类 (1)阻塞(非并行) (2)批次并行 (3)异步 2.按传参分类 (1)单个任务,任务多参数 (2)多个任务,任务单参数 (3)多个任务,任务多参数 3. ...

最新文章

  1. 重磅剧透!阿里巴巴计划开源 Nacos,为Dubbo生态发展铺路
  2. java 防渗透_「java、工程师工作经验怎么写」-看准网
  3. Linux 免密码sudo
  4. spring jms 消息_Spring JMS,消息自动转换,JMS模板
  5. linux上TCP connection timeout的原因查找
  6. Netty学习总结(1)——Netty入门介绍
  7. STM32工作笔记0047--认识DTU什么是4GDTU设备
  8. 计算机图形学----投影矩阵
  9. MongoDB学习笔记~官方驱动的原生Curd操作
  10. 中点和中值滤波的区别_滤波器知识总结:详解滤波器分类、技术参数及部分种类介绍...
  11. Dart 语言基础分步指南
  12. Dialogs(对话框)
  13. (OK) 股市财经博客参考!
  14. mysql 本周、上周、本月、上月SQL 语句
  15. pdf生成目录-如何给没有目录的pdf手动添加目录
  16. 速围观!上千款“AI黑科技”在此集结
  17. h3c无线认证服务器,H3C无线路由器配置样例之带认证接入
  18. 这部豆瓣评分 9.4 的古董沙雕剧和 Python ...
  19. Linux使用Backspace(消除键)键时出现^H 解决方法
  20. Android 黑马Topline《企业级项目实战教程》2022

热门文章

  1. Sigmoid函数与Softmax函数的区别与联系
  2. Logan:美团点评的开源移动端基础日志库
  3. 美团大脑:知识图谱的建模方法及其应用
  4. 大数据技术和python开发工程师
  5. 国科大prml-往年习题
  6. 基于对抗生成网络的滚动轴承故障检测方法
  7. com.sun.jersey.api.client.UniformInterfaceException:returned a response status of 403
  8. 洛谷P1558 色板游戏
  9. jsp内置对象(四)-----session对象
  10. 在XML里的XSD和DTD以及standalone的使用