Demo代码和引用知识点都参考自《理解Python并发编程一篇就够了|PoolExecutor篇》--董伟明或作者个人公众号Python之美, 《Python Cookbook》和Python并发编程之线程池/进程池。

ThreadPoolExecutor 和ProcessPoolExecutor分别对threading和multiprocessing进行了高级抽象,暴露出简单的统一接口。

通过ProcessPoolExecutor 来做示例。

主要提供两个方法map() 和submit()。

map() 方法主要用来针对简化执行相同的方法,如下例:

# -*- coding: utf-8 -*-

from concurrent.futures import ProcessPoolExecutor

def fib(n, test_arg):

if n > 30:

raise Exception('can not > 30, now %s' % n)

if n <= 2:

return 1

return fib(n-1, test_arg) + fib(n-2, test_arg)

def use_map():

nums = [random.randint(0, 33) for _ in range(0, 10)]

with ProcessPoolExecutor() as executor:

try:

results = executor.map(fib, nums, nums)

for num, result in zip(nums, results):

print('fib(%s) result is %s.' % (num, result))

except Exception as e:

print(e)

执行上例,输出如下,当num为30时抛出异常捕获后程序停止运行。

...

fib(19) result is 4181.

fib(11) result is 89.

fib(2) result is 1.

fib(5) result is 5.

fib(24) result is 46368.

fib(2) result is 1.

can not > 30, now 33

使用submit()方法。

# -*- coding: utf-8 -*-

from concurrent.futures import ProcessPoolExecutor, as_completed

import random

def fib(n, test_arg):

if n > 30:

raise Exception('can not > 30, now %s' % n)

if n <= 2:

return 1

return fib(n-1, test_arg) + fib(n-2, test_arg)

def use_submit():

nums = [random.randint(0, 33) for _ in range(0, 10)]

with ProcessPoolExecutor() as executor:

futures = {executor.submit(fib, n, n): n for n in nums}

for f in as_completed(futures):

try:

print('fib(%s) result is %s.' % (futures[f], f.result()))

except Exception as e:

print(e)

执行上例,输出如下,可见当抛出异常并捕获后,继续向后输出,并没有向map()一样停止,除了as_completed(),还有wait()等方法。

fib(3) result is 2.

fib(15) result is 610.

can not > 30, now 31

fib(23) result is 28657.

fib(1) result is 1.

can not > 30, now 32

fib(14) result is 377.

fib(12) result is 144.

fib(26) result is 121393.

fib(29) result is 514229.

但try/except的代码块包括as_completed()则不会继续输出,直接停止,暂时未找到原因。

def use_submit():

nums = [random.randint(0, 33) for _ in range(0, 10)]

with ProcessPoolExecutor() as executor:

futures = {executor.submit(fib, n, n): n for n in nums}

try:

for f in as_completed(futures):

print('fib(%s) result is %s.' % (futures[f], f.result()))

except Exception as e:

print(e)

其他:

map()是根据传入的参数然后顺序输出的,as_completed()是按完成时间输出的,上面的例子不明显,可以参考Python并发编程之线程池/进程池,但都跟max_workers 参数和方法执行时间挂钩。

import time

def test_sleep(n):

time.sleep(n)

return True

def use_submit():

nums = [3, 2, 1, 3]

with ProcessPoolExecutor(max_workers=3) as executor:

futures = {executor.submit(test_sleep, n): n for n in nums}

for f in as_completed(futures):

try:

print('%s result is %s.' % (futures[f], f.result()))

except Exception as e:

print(e)

def use_map():

nums = [3, 2, 1, 3]

with ProcessPoolExecutor(max_workers=3) as executor:

try:

results = executor.map(test_sleep, nums)

for num, result in zip(nums, results):

print('%s result is %s.' % (num, result))

except Exception as e:

print(e)

use_submit() 输出如下,耗时3+1=4s,且完成一个输出一个,指定max_workers=3,第一个耗时1s的完成后就会执行第四个耗时3s的任务。

1 result is True.

2 result is True.

3 result is True.

3 result is True.

use_map() 输出如下,同样是耗时3+1=4s,但是是按传入参数顺序输入,因为指定max_workers=3,所以前三个是在耗时3s后一起输出的,第四个在耗时4s后再输出。

3 result is True.

2 result is True.

1 result is True.

3 result is True.

阅读部分map()源码。

def map(self, fn, *iterables, timeout=None, chunksize=1):

"""Returns an iterator equivalent to map(fn, iter).

Args:

fn: A callable that will take as many arguments as there are

passed iterables.

timeout: The maximum number of seconds to wait. If None, then there

is no limit on the wait time.

chunksize: The size of the chunks the iterable will be broken into

before being passed to a child process. This argument is only

used by ProcessPoolExecutor; it is ignored by

ThreadPoolExecutor.

Returns:

An iterator equivalent to: map(func, *iterables) but the calls may

be evaluated out-of-order.

Raises:

TimeoutError: If the entire result iterator could not be generated

before the given timeout.

Exception: If fn(*args) raises for any values.

"""

if timeout is not None:

end_time = timeout + time.time()

fs = [self.submit(fn, *args) for args in zip(*iterables)]

# Yield must be hidden in closure so that the futures are submitted

# before the first iterator value is required.

def result_iterator():

try:

for future in fs:

if timeout is None:

yield future.result()

else:

yield future.result(end_time - time.time())

finally:

for future in fs:

future.cancel()

return result_iterator()

fs存放了submit()后返回的future实例,是按传入的参数顺序排序的,返回了result_iterator()。至于为什么会按max_workers数一组返回输出,暂时不清楚。

as_completed()源码,理解略有困难。

ProcessExecutorPool()的实现:

process.png

我们结合源码和上面的数据流分析一下:

executor.map会创建多个_WorkItem对象(ps. 实际上是执行了多次submit()),每个对象都传入了新创建的一个Future对象。

把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。

创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:

从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。

从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。

简单分析submit()。

def submit(self, fn, *args, **kwargs):

with self._shutdown_lock:

if self._broken:

raise BrokenProcessPool('A child process terminated '

'abruptly, the process pool is not usable anymore')

if self._shutdown_thread:

raise RuntimeError('cannot schedule new futures after shutdown')

f = _base.Future()

w = _WorkItem(f, fn, args, kwargs)

self._pending_work_items[self._queue_count] = w

self._work_ids.put(self._queue_count)

self._queue_count += 1

# Wake up queue management thread

self._result_queue.put(None)

self._start_queue_management_thread()

return f

创建Future()实例f,和_WorkItem()实例w。

_pending_work_items即上述所说的Work Items字典,key为_queue_count,初始化为0;value为w。并将_queue_count添加到_work_ids队列中。

Wake up queue management thread,即唤醒上述图中的Local Work Thread。

def _start_queue_management_thread(self):

# When the executor gets lost, the weakref callback will wake up

# the queue management thread.

def weakref_cb(_, q=self._result_queue):

q.put(None)

if self._queue_management_thread is None:

# Start the processes so that their sentinels are known.

self._adjust_process_count()

self._queue_management_thread = threading.Thread(

target=_queue_management_worker,

args=(weakref.ref(self, weakref_cb),

self._processes,

self._pending_work_items,

self._work_ids,

self._call_queue,

self._result_queue))

self._queue_management_thread.daemon = True

self._queue_management_thread.start()

_threads_queues[self._queue_management_thread] = self._result_queue

def _adjust_process_count(self):

for _ in range(len(self._processes), self._max_workers):

p = multiprocessing.Process(

target=_process_worker,

args=(self._call_queue,

self._result_queue))

p.start()

self._processes[p.pid] = p

_adjust_process_count()开启max_wokers个进程,执行_process_worker()。

开启_queue_management_thread()线程,即Local Worker Thread。

_queue_management_thread()线程中将调用_add_call_item_to_queue()将_CallItem置于call_queue,并删除引用等操作,该方法理解有困难。

def _process_worker(call_queue, result_queue):

"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.

Args:

call_queue: A multiprocessing.Queue of _CallItems that will be read and

evaluated by the worker.

result_queue: A multiprocessing.Queue of _ResultItems that will written

to by the worker.

shutdown: A multiprocessing.Event that will be set as a signal to the

worker that it should exit when call_queue is empty.

"""

while True:

call_item = call_queue.get(block=True)

if call_item is None:

# Wake up queue management thread

result_queue.put(os.getpid())

return

try:

r = call_item.fn(*call_item.args, **call_item.kwargs)

except BaseException as e:

exc = _ExceptionWithTraceback(e, e.__traceback__)

result_queue.put(_ResultItem(call_item.work_id, exception=exc))

else:

result_queue.put(_ResultItem(call_item.work_id,

result=r))

执行任务进程,从call_queue中获取_CallItem并调用其fn,将结果放进result_queue中。

python processpoolexecutor_理解Python的PoolExecutor相关推荐

  1. 【Python】理解Python(1) - Python数据模型,is关键字,类型

    文章目录 Python的数据模型 对象的标识 对象的类型 对象的值 is 关键字 判断一个变量是否指向函数 types模块中定义的类型 本文是作者对Python官方文档的理解和试验结果,不保证技术准确 ...

  2. Python - 深刻理解Python中的元类(metaclass)

    分享一个大牛的人工智能教程.零基础!通俗易懂!风趣幽默!希望你也加入到人工智能的队伍中来!请点击http://www.captainbed.net 1.类也是对象 在理解元类之前,你需要先掌握Pyth ...

  3. python legb_理解 Python 的 LEGB.

    名字空间 Python 的名字空间是 Python 一个非常核心的内容. 其他语言中如 C 中,变量名是内存地址的别名,而在 Python 中,名字是一个字符串对象,它与他指向的对象构成一个{name ...

  4. 【Python】理解Python(2) - help() 函数? or 类?

    文章目录 help() 函数 help()详解 help不是一个函数 help的导入过程 object.\_\_call__(self [, args ...]) 函数 site.py模块 与 -S ...

  5. python nonetype_理解Python中的NoneType对象

    编译:老齐 在C.Java等类型的语言中,都有null,它常常被定义为与0等效.但是,在Python中并非如此.Python中用关键词None表征null对象,它并不是0,它是Python中的第一类对 ...

  6. python函数理解,python对函数的理解

    函数 函数可以提高编写代码效率.代码的重用.让程序更小.模块化 可以将一段独立功能的代码集成在一个块中.封装独立功能 # 函数定义(参数名为形式参数) def 函数名(参数名): 函数体 # 调用函数 ...

  7. 我对python的理解_python高级函数以及我对python的理解

    常见的高级函数:lambda.map.reduce.filter.list comprehension lambda 匿名函数,限制一个表达式 m = lambda x,y:x+y # 5 m(2,3 ...

  8. python self理解_Python列表理解

    python self理解 In our previous tutorial we learned about Python Sort List. In this tutorial we will l ...

  9. 完全理解 Python 迭代对象、迭代器、生成器(转)

    完全理解 Python 迭代对象.迭代器.生成器 本文源自RQ作者的一篇博文,原文是Iterables vs. Iterators vs. Generators » nvie.com,俺写的这篇文章是 ...

最新文章

  1. VirtualBox安装64位Linux
  2. Understanding SOAP
  3. Java--缓存热点数据,最近最少使用算法
  4. 温铁军、林毅夫、陈平,从学术、现实等多方面来分析,谁的价值高?
  5. Nginx入门到实战(1)基础篇
  6. iOS-Xcode代码统计
  7. hello,你知道获取元素有哪几种方式吗?
  8. DedeCMS 提示信息! ----------dede_addonarticle
  9. iis下php 500错误
  10. 《Applying Deep Learning to Answer Selection: A Study And an Open Task》文章理解小结
  11. Mac电脑查看JDK文档,CHM格式文档阅读
  12. 极客日报第 21 期:360 安全浏览器尝试收费;苹果macOS首次出现在云端
  13. IndexError: Caught IndexError in DataLoader worker process 0.
  14. 三星Cortex-A53八核6818核心板
  15. 做IT民工还是IT精英?
  16. coffeescript java 执行_CoffeeScript—面向对象
  17. 安卓应用加固壳判断java厂商_Android APK加固(加壳)工具
  18. 【无标题】手机电脑被木马病毒感染,私生活被全面监控
  19. 【算法设计zxd】第四章蛮力法 1.枚举法 02穷举查找
  20. 量化分析师的Python日记【Q Quant兵器谱之偏微分方程3的具体金融学运用】

热门文章

  1. TestNG开源插件Arrow介绍
  2. 立创eda学习笔记二十六:手把手教你使用立创eda的官方教程
  3. EDA-什么是综合?什么是适配?
  4. php laravel 开发工具,Laravel 快速开发工具
  5. 计算机2016基础知识,计算机基础知识2016.doc
  6. webug靶场渗透基础攻略
  7. 大数据 端到端_成为数据科学家的端到端指南
  8. Mac系统怎么升级到macOS Catalina 10.15 beta版
  9. 网页文件是用html语言创建的文本文件,html格式怎么编辑 在电脑桌面创建一个空白文本文件:...
  10. wan端口未连接怎么弄_wan口未连接是什么意思?