Python 并发模块

multiprocessing 介绍

multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。

multiprocessing 模块还引入了在 threading 模块中没有类似物的API。这方面的一个主要例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用 Pool

from multiprocessing import Pool

def f(x):

return x*x

if __name__ == '__main__':

with Pool(5) as p:

print(p.map(f, [1, 2, 3]))

输出结果:

[1, 4, 9]

Process 类

在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 Process 和 threading.Thread API 相同。 一个简单的多进程程序示例是:

from multiprocessing import Process

def f(name):

print('hello', name)

if __name__ == '__main__':

p = Process(target=f, args=('bob',))

p.start()

p.join()

输出结果:

hello bob

要显示所涉及的各个进程ID,这是一个扩展示例:

from multiprocessing import Process

import os

def info(title):

print(title)

print('module name:', __name__)

print('parent process:', os.getppid())

print('process id:', os.getpid())

def f(name):

info('function f')

print('hello', name)

if __name__ == '__main__':

info('main line')

p = Process(target=f, args=('bob',))

p.start()

p.join()

输出结果:

main line

module name: __main__

parent process: 817

process id: 92734

function f

module name: __main__

parent process: 92734

process id: 92806

hello bob

为了解释为什么if __name__ == '__main__' 部分是必需的,请参见 编程指南。

上下文和启动方法

根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法 有

spawn

父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。

允许在 Unix and Windows. 默认Windows.

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。

只存在于Unix。Unix中的默认值。

forkserver

程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。可在Unix平台上使用,支持通过Unix管道传递文件描述符。

在 3.4 版更改: spawn 在所有unix平台上添加,并且为一些unix平台添加了 forkserver 。子进程不再继承Windows上的所有上级进程可继承的句柄。

要选择一个启动方法,你应该在主模块的if __name__ == '__main__'子句中调用 set_start_method()。例如:

import multiprocessing as mp

def foo(q):

q.put('hello')

if __name__ == '__main__':

mp.set_start_method('spawn')

q = mp.Queue()

p = mp.Process(target=foo, args=(q,))

p.start()

print(q.get())

p.join()

输出报错:

Traceback (most recent call last):

File "", line 2, in

File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 242, in set_start_method

raise RuntimeError('context has already been set')

RuntimeError: context has already been set

在程序中 set_start_method() 不应该被多次调用。

或者,你可以使用 get_context() 来获取上下文对象。上下文对象与多处理模块具有相同的API,并允许在同一程序中使用多个启动方法。:

import multiprocessing as mp

def foo(q):

q.put('hello')

if __name__ == '__main__':

ctx = mp.get_context('spawn')

q = ctx.Queue()

p = ctx.Process(target=foo, args=(q,))

p.start()

print(q.get())

p.join()

输出报错:

Traceback (most recent call last):

File "", line 1, in

File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main

exitcode = _main(fd)

File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 115, in _main

self = reduction.pickle.load(from_parent)

AttributeError: Can't get attribute 'foo' on

请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。

想要使用特定启动方法的库应该使用 get_context() 以避免干扰库用户的选择。

警告: 'spawn' 和 'forkserver' 启动方法当前不能在Unix上和“冻结的”可执行内容一同使用(例如,有类似 PyInstaller 和 cx_Freeze 的包产生的二进制文件)。 'fork' 启动方法可以使用。

在进程之间交换对象

multiprocessing 支持进程之间的两种通信通道:

队列

Queue 类是一个近似 queue.Queue 的克隆。 例如:

from multiprocessing import Process, Queue

def f(q):

q.put([42, None, 'hello'])

if __name__ == '__main__':

q = Queue()

p = Process(target=f, args=(q,))

p.start()

print(q.get()) # prints "[42, None, 'hello']"

p.join()

队列是线程和进程安全的。

管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

from multiprocessing import Process, Pipe

def f(conn):

conn.send([42, None, 'hello'])

conn.close()

if __name__ == '__main__':

parent_conn, child_conn = Pipe()

p = Process(target=f, args=(child_conn,))

p.start()

print(parent_conn.recv()) # prints "[42, None, 'hello']"

p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。

进程之间的同步

multiprocessing 包含来自 threading 的所有同步。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):

l.acquire()

try:

print('hello world', i)

finally:

l.release()

if __name__ == '__main__':

lock = Lock()

for num in range(10):

Process(target=f, args=(lock, num)).start()

输出结果:

hello world 0

hello world 1

hello world 3

hello world 2

hello world 4

hello world 5

hello world 6

hello world 7

hello world 8

hello world 9

不使用来自不同进程的锁输出容易产生混淆。

在进程之间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

共享内存

可以使用 Value 或 Array 将数据存储在共享内存映射中。例如,以下代码:

from multiprocessing import Process, Value, Array

def f(n, a):

n.value = 3.1415927

for i in range(len(a)):

a[i] = -a[i]

if __name__ == '__main__':

num = Value('d', 0.0)

arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))

p.start()

p.join()

print(num.value)

print(arr[:])

打印结果:

3.1415927

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 num 和 arr 时使用的 'd' 和 'i' 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务器进程

由 Manager() 返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。例如

from multiprocessing import Process, Manager

def f(d, l):

d[1] = '1'

d['2'] = 2

d[0.25] = None

l.reverse()

if __name__ == '__main__':

with Manager() as manager:

d = manager.dict()

l = manager.list(range(10))

p = Process(target=f, args=(d, l))

p.start()

p.join()

print(d)

print(l)

输出结果:

{0.25: None, 1: '1', '2': 2}

[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

进程池使用

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

例如:

from multiprocessing import Pool, TimeoutError

import time

import os

def f(x):

return x*x

if __name__ == '__main__':

# start 4 worker processes

with Pool(processes=4) as pool:

# print "[0, 1, 4,..., 81]"

print(pool.map(f, range(10)))

# print same numbers in arbitrary order

for i in pool.imap_unordered(f, range(10)):

print(i)

# evaluate "f(20)" asynchronously

res = pool.apply_async(f, (20,)) # runs in *only* one process

print(res.get(timeout=1)) # prints "400"

# evaluate "os.getpid()" asynchronously

res = pool.apply_async(os.getpid, ()) # runs in *only* one process

print(res.get(timeout=1)) # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes

multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]

print([res.get(timeout=1) for res in multiple_results])

# make a single worker sleep for 10 secs

res = pool.apply_async(time.sleep, (10,))

try:

print(res.get(timeout=1))

except TimeoutError:

print("We lacked patience and got a multiprocessing.TimeoutError")

print("For the moment, the pool remains available for more work")

# exiting the 'with'-block has stopped the pool

print("Now the pool is closed and no longer available")

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

0

4

16

9

1

25

36

64

81

49

400

95759

[95760, 95761, 95758, 95759]

We lacked patience and got a multiprocessing.TimeoutError

For the moment, the pool remains available for more work

Now the pool is closed and no longer available

请注意:Pool 的方法只能由创建它的进程使用。

注解:

该软件包中的功能要求子项可以导入 __main__ 模块。这包含在 Programming guidelines 中,但值得指出。这意味着一些示例,例如 multiprocessing.pool.Pool 示例在交互式解释器中不起作用。例如:

>>> from multiprocessing import Pool

>>> p = Pool(5)

>>> def f(x):

... return x*x

...

>>> p.map(f, [1,2,3])

Process PoolWorker-1:

Process PoolWorker-2:

Process PoolWorker-3:

Traceback (most recent call last):

AttributeError: 'module' object has no attribute 'f'

AttributeError: 'module' object has no attribute 'f'

AttributeError: 'module' object has no attribute 'f'

(如果你尝试这个,它实际上会以半随机的方式输出三个完整的回溯,然后你可能不得不以某种方式停止主进程。)

参考:https://docs.python.org/2/library/multiprocessing.html

python manager模块_Python 并发模块相关推荐

  1. python贪婪匹配_python re模块匹配贪婪和非贪婪模式详解

    python re模块匹配贪婪和非贪婪模式详解 这篇文章主要介绍了python re模块匹配贪婪和非贪婪模式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友 ...

  2. python argparse模块_Python argparse模块应用实例解析

    这篇文章主要介绍了Python argparse模块应用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 简介 argparse是python ...

  3. python莫比乌斯环_python基础|模块

    1 模块简介 在python中常见的模块有三种,在python解释器中的内置模块,第三方模块和自定义模块.模块的有使用python编写的文件,有已被编译为共享库或DLL的C或C++扩展,也有使用C编写 ...

  4. python如何自定义模块_python自定义模块和开源模块使用方法

    模块,用一砣代码实现了某个功能的代码集合. 类似于函数式编程和面向过程编程,函数式编程则完成一个功能,其他代码用来调用即可,提供了代码的重用性和代码间的耦合.而对于一个复杂的功能来,可能需要多个函数才 ...

  5. python import变量_Python import模块调用

    开发过程中代码越写越多,在一个文件里代码会越来越长,不容易维护,为了容易维护代码,我们把很多函数分组,分别放在不同的文件里,在Python中,一个.py文件就是模块(Module) 工具/原料 Pyt ...

  6. python shelve模块_python常用模块之shelve模块

    python常用模块之shelve模块 shelve模块是一个简单的k,v将内存中的数据通过文件持久化的模块,可以持久化任何pickle可支持的python数据类型 我们在上面讲json.pickle ...

  7. python常用运维模块_python常用模块之一

    sys模块: sys模块是提供关于python本身的详细内在的信息的模块. sys.executable变量,它包含python解释器的路径 sys.platform变量,告诉我们现在处于什么操作系统 ...

  8. python 多层包多模块_python Modules模块操作

    今天学习python的Modules模块操作,并记录学习过程欢迎大家一起交流分享. 首先新建一个python文件命名为my_module.py的自定义moudle文件,在这个文件中进行模块代码编写: ...

  9. python counter模块_python collections模块 计数器(counter)

    一.计数器(counter) Counter是对字典类型的补充,用于追踪值的出现次数. ps:具备字典的所有功能 + 自己的功能 把我写入的元素出现的多少次都计算出来 import collectio ...

最新文章

  1. R语言使用lubridate包的tz函数设置和查询日期、时间对象的时区信息( time zone)
  2. Linux 小知识翻译 - 「cron」
  3. Django框架进阶
  4. 易评:软银收购ARM会扼住中国芯发展的咽喉吗?
  5. postgresSQL 实现数据修改后,自动更新updated_date/ts等字段
  6. 设计模式——装饰器模式
  7. 处理器映射器(HandlerMapping)及处理器适配器(HandlerAdapter)详解(一)
  8. 框架模式和设计模式的区别
  9. 安装scws需要安装php吗,Linux 安装SCWS-1.2.3 安装说明(包括php扩展)
  10. 前置++与后置++的要点分析
  11. python怎样画立体图-如何用Matplotlib 画三维图的示例代码
  12. 垃圾收集(GC)中如何确定哪些内存是垃圾
  13. 高级软件测试11.27日小组工作-1701班第5组
  14. CTFShow“萌心区“WP题解
  15. linux软件中心无法安装软件,linux中软件的安装
  16. 计算机i网络管理员证书四级,软考网络管理员试题练习(4)
  17. SMETA验厂咨询,Sedex验厂工厂的自检流程有哪些
  18. jQuery基础(菜鸟教程,建议收藏不然怕你后悔!)
  19. Errors during downloading metadata for repository ‘AppStream‘: - Status code: 404 for http://mirro
  20. 图片素材类网站必备以图搜图、智能搜图识图图像搜索系统imgso,让素材网站更智能专业

热门文章

  1. 微服务的好处与弊端_一文了解微服务的流程和组织
  2. 2022央视春晚电视端直播平均收视率达21.93%
  3. 特斯拉回应Model 3新车无USB接口:芯片短缺
  4. 近期新机发布一览:最便宜的只需699元!
  5. 3成失眠者放下手机才能睡 说中你了吗?
  6. 华为Mate X2再曝光:全新向内折叠方案 有望彻底消除折痕
  7. 就地过年的年轻人都去搜索“年夜饭”外卖了
  8. 今天,A股犹如过年!股市太火,炒股app都绷不住了
  9. 找工作的人太多导致平台崩了?BOSS直聘回应:系统故障 已修复
  10. 2020年中国最具影响力的50位商界领袖:马云、任正非、王传福位列前三