python 因 GIL 的存在,处理计算密集型的任务时无法高效利用多核 CPU 的计算资源,这时就需要使用多进程来提高对 CPU 的资源利用。Python 多进程主要用 multiprocessing 模块实现,提供了进程、进程池、队列、管理者、共享数据、同步原语功能。

单进程版

为了便于演示 multiprocessing 的使用,我们使用素数检查模拟计算密集型任务。单进程版本的代码如下:

# encoding:utf8

from math import sqrt

CHECK_NUMBERS = 1000000

def check_prime(n):

if n == 2:

return True

for i in range(2, int(sqrt(n) + 1)):

if n % i == 0:

return False

return True

def run():

total = 0

for i in range(2, CHECK_NUMBERS + 1):

if check_prime(i):

total += 1

return total

if __name__ == "__main__":

import timeit

print(run())

print(timeit.timeit("run()", "from __main__ import run", number=1))

以上单进程的示例在我的计算机上输出结果为:

78498

4.788863064308802

即 1000000 以内共有 78498 个素数,耗时约 4.8 秒。

进程与队列

生成一个新的进程并启动如下:

processes = multiprocessing.Process(target=worker, args=param)

processes.start() # 启动

processes.join() # 主进程等待

子进程中要把结果返回可以通过 Queue 队列返回,Queue 的操作主要是 get 和 put,支持阻塞(可以设置超时),非阻塞,也能监测队列状态是否为空。

以下代码使用了多个进程,将 2 到 1000000 的大区间分为 4 个小区间,再分配给 4 个进程去分别计算小区间内共有多少素数,通过队列返回。主进程最后把每个进程统计的素数个数相加即是最终结果。

# encoding:utf8

import multiprocessing

from math import sqrt

from multi.single_thread_check_prime import check_prime

CHECK_NUMBERS = 1000000

NUM_PROCESSES = multiprocessing.cpu_count()

def worker(start, end, result_mq):

'''count prime numbers between start and end(exclusive)'''

total = 0

for n in range(start, end):

if check_prime(n):

total += 1

result_mq.put(total)

def divide_range(lower_end, upper_end, num_range):

'''divide a larger range into smaller ranges'''

step = int((upper_end - lower_end) / num_range)

ranges = []

subrange_upper = lower_end

while subrange_upper <= upper_end:

subrange_lowerend = subrange_upper

subrange_upper += step

if subrange_upper <= upper_end:

ranges.append((subrange_lowerend, subrange_upper))

continue

if subrange_lowerend < upper_end:

ranges.append((subrange_lowerend, upper_end))

return ranges

def run():

params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)]

result_mq = multiprocessing.Queue()

processes = []

for i in range(NUM_PROCESSES):

process = multiprocessing.Process(target=worker, args=list(params[i]) + [result_mq] )

processes.append(process)

process.start()

for process in processes:

process.join()

total = 0

for i in range(NUM_PROCESSES):

count = result_mq.get()

total += count

print(total)

return total

if __name__ == "__main__":

import timeit

print(timeit.timeit("run()", "from __main__ import run", number=1))

使用多进程后的输出结果为:

78498

1.6613719538839973

最终结果一致,约快了 2.9 倍。我的电脑 CPU 有 4 核,由于创建进程、进程间通信也都需要消耗资源,所以没法达到理想的 4 倍,但也已经是不错的提升了。

进程池

前面一个例子通过实例化 Process 的方法生成新的进程,再对每个进程调用 start 和 join,但其实通过进程池可以使代码更简洁。上面多进程的例子中,使用进程池后可以去除 Queue 队列的使用。首先在 worker 中去除消息队列 result_mq,直接返回结果如下:

def worker(sub_range):

'''count prime numbers between start and end(exclusive)'''

start, end = sub_range

total = 0

for n in range(start, end):

if check_prime(n):

total += 1

return total

这时在 run 函数里就可以用进程池的 map 方法,修改后的 run:

def run():

params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)]

pool = multiprocessing.Pool(processes=NUM_PROCESSES)

result = pool.map(worker, params)

total = sum(result)

print(total)

return total

由于不用在代码里逐个生成子进程,同时 map 方法可以直接返回结果,run 函数从原来的 19 行缩短为了 8 行。进程池不仅有 map 方法,还有 map_async, apply, apply_async 等方法,带 async 后缀的方法能够实现非阻塞调用,主进程不必等到子进程运行完毕才往下运行。比如上面的 run 函数可以修改成如下:

def run():

params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)]

pool = multiprocessing.Pool(processes=NUM_PROCESSES)

result = pool.map_async(worker, params)

pool.close()

# do something else here

...

pool.join()

total = sum(result.get())

print(total)

return total

管理者

管理者 Manager 可以存储需要在进程间共享的对象,其支持的类型包括 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value, Array。相比于共用内存,Manager 可以让不同机器上的进程通过网络共享对象。Manager 的 register 方法还可以自定义新的类型或者可调用对象,具体使用见文档:Manager

sharedctypes

multiprocessing.sharedctypes 可以用来在共享内存中创建 c 类型数据,可以作为参数在创建子进程时传入。其中主要用到的有 Value, RawValue, Array, RawArray 。Value, Array 通过参数可以设置是否需要锁实现进程安全,如果对进程间同步没有要求使用 RawValue 和 RawArray 则有更高的运行效率。实例化使用如下:

n = Value('i', 7)

x = RawArray(‘h’, 7)

s = RawArray(‘i’, (9, 2, 8))

第一个参数指定类型,类型编码见array module,或者使用 ctypes 模块,如 ctypes.c_double,第二个参数为值,对于 Array 和 RawArray,当第二个参数为整型时则为数组的长度,数组元素初始化为0。对于 linux 操作系统,只要把 n, x, s 设置成全局变量,即可在子进程中使用,无需显式传参,但 window 操作系统则需要在实例化 Process 时传入参数。不可使用 Pool.map 把 n, x, s 作为参数传递,因为 map 使用的序列化而 n,x,s 不可序列化。具体使用见下一小节。

共享 numpy 数组

需要用到 numpy 时往往是数据量较大的场景,如果直接复制会造成大量内存浪费。共享 numpy 数组则是通过上面一节的 Array 实现,再用 numpy.frombuffer 以及 reshape 对共享的内存封装成 numpy 数组,代码如下:

# encoding:utf8

import ctypes

import os

import multiprocessing

import numpy as np

NUM_PROCESS = multiprocessing.cpu_count()

def worker(index):

main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double)

main_nparray = main_nparray.reshape(NUM_PROCESS, 10)

pid = os.getpid()

main_nparray[index, :] = pid

return pid

if __name__ == "__main__":

shared_array_base = multiprocessing.Array(

ctypes.c_double, NUM_PROCESS * 10, lock=False)

pool = multiprocessing.Pool(processes=NUM_PROCESS)

result = pool.map(worker, range(NUM_PROCESS))

main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double)

main_nparray = main_nparray.reshape(NUM_PROCESS, 10)

print main_nparray

mmap

mmap 把文件映射到内存,也可以用于进程间通信,可以像字符串或文件一样对其进行操作,操作较为简单。

import mmap

# write a simple example file

with open("hello.txt", "wb") as f:

f.write("Hello Python!\n")

with open("hello.txt", "r+b") as f:

# memory-map the file, size 0 means whole file

mm = mmap.mmap(f.fileno(), 0)

# read content via standard file methods

print mm.readline() # prints "Hello Python!"

# read content via slice notation

print mm[:5] # prints "Hello"

# update content using slice notation;

# note that new content must have same size

mm[6:] = " world!\n"

# ... and read again using standard file methods

mm.seek(0)

print mm.readline() # prints "Hello world!"

# close the map

mm.close()

以上例子(转载自文档)为映射文件,对于进程间共享内存在 mmap 时第一个参数设置为 -1 实现匿名映射,只创建共享内存不映射到磁盘,见官网mmap。

python多进程存储数据_Python 多进程及进程间通信相关推荐

  1. python多进程存储数据_python – 多进程还是多线程? – 并行化数百万次迭代的简单计算并将结果存储在单个数据结构中...

    第一个选项 – 服务器进程 创建服务器进程.它是Multiprocessing包的一部分,允许并行访问数据结构.这样,每个进程都将直接访问数据结构,锁定其他进程. Server process A m ...

  2. python怎么存储数据_Python:如何在类中存储数据并继承

    我对课程很陌生,我已经写了一个班来做熊猫的操作. @分类方法 ,但我不知道如何使用它们. 我的班级目标: 我想写一个权限类.它应该加载/写入一个json文件,并且应该能够添加更多的权限. 编辑: 读了 ...

  3. python归档excel数据_python 数据存储excel

    Python的小数据存储,用什么格式更有逼格? 小数据存储 我们在编写代码的时候,经常会涉及到数据存储的情况,如果是爬虫得到的大数据,我们会选择使用数据库,或者excel存储.但如果只是一些小数据,或 ...

  4. Python如何存储数据到json文件

    1 前言 很多程序都要求用户输入某种信息,程序一般将信息存储在列表和字典等数据结构中. 用户关闭程序时,就需要将信息进行保存,一种简单的方式是使用模块json来存储数据. 模块json让你能够将简单的 ...

  5. python永久保存数据_python如何保存数据

    在Python开发中,数据存储.读取是必不可少的环节,而且可以采用的存储方式也很多,常用的方法有json文件.csv文件.MySQL数据库.Redis数据库以及Mongdb数据库等. 1. json文 ...

  6. python读取grib2数据_python读取grib grib2气象数据

    GRIB是WMO开发的一种用于交换和存储规则分布数据的二进制文件格式,主要用来表示数值天气预报的产品资料.现行的GRIB 码版本有GRIB1 和GRIB2 两种格式.由于小编只接触过GRIB1数据,所 ...

  7. python pdf提取数据_python从PDF中提取数据的示例

    01 前言 数据是数据科学中任何分析的关键,大多数分析中最常用的数据集类型是存储在逗号分隔值(csv)表中的干净数据.然而,由于可移植文档格式(pdf)文件是最常用的文件格式之一,因此每个数据科学家都 ...

  8. python通信达数据_Python读取通达信数据

    Python读取通达信数据 一.介绍 python获取股票数据的方法很多,其中Tushare 财经数据接口包很好用,当然,也可以通过通达信本地的数据获取,这样更为方便. 日线数据存在这路径下 D:\通 ...

  9. python 通达信数据_Python读取通达信本地数据

    一.介绍 python获取股票数据的方法很多,其中 Tushare 财经数据接口包很好用,当然,也可以通过通达信本地的数据获取,这样更为方便. 日线数据存在这路径下 D:\通达信\vipdoc\sh\ ...

最新文章

  1. 最喜欢随机森林?周志华团队DF21后,TensorFlow开源决策森林库TF-DF
  2. 计算灰度共生矩阵相关性时sigmax,y 为0的问题
  3. usb接口供电不足_电脑USB接口不够用?来试试ORICO条纹hub扩展器吧
  4. lsof 详解 (转载)
  5. java类创建顺序,Java基础----你真的了解java类创建顺序吗?
  6. Spring Boot——Redis安装配置与应用整合
  7. multisim秒信号发生器_频谱中射频干扰信号流化、分析与回放
  8. 初识python你应该知道的6个知识点
  9. 源文件与模块文件生成时的文件不同,仍要调试器使用它吗
  10. mysql创建存储过程意义_浅谈一下mySql中创建存储过程
  11. 包含对象的json格式_如何把JSON数据格式转换为Python的类对象?
  12. 洛谷P4593 [TJOI2018]教科书般的亵渎 【数学】
  13. java中如何访问类中的字段_java – 在子类中使用super关键字访问超类私有字段
  14. JDK 1.6 API 中文版
  15. 惠普笔记本苹果w10双系统_苹果电脑安装双系统教程:MacOS和Win10双系统安装教程...
  16. 2022-2027年(新版)中国石油化工行业发展建议及投资前景展望报告
  17. python使用numpy的np.mod函数计算numpy数组除以某一特定数值剩余的余数(remainder)、np.mod函数和np.fmod函数对负值的处理方式有差异
  18. 无限制翻译PDF文件——低调使用
  19. LeetCode 494. Target Sume
  20. storm 2.2.1 java + idea实现wordcount

热门文章

  1. Entrust - Laravel 用户权限系统解决方案
  2. IDEA解决SSM项目的静态资源路径问题:HTML,CSS,JS--详解
  3. 不关闭seLinux解决vsftpd服务本地用户不能登录问题(500 OOPS: cannot change directory:/home/***
  4. jquery动态添加元素无法触发绑定事件的解决方案。
  5. Nginx 出现504 Gateway Time-out的解决方法
  6. C#调用COM组件遇到的问题及解决办法
  7. RabbitMQ脑裂问题解决方案调查
  8. springboot获取getBean方法以及ApplicationContext空指针问题解决
  9. 禁用/启用按钮和链接的最简单方法是什么(jQuery + Bootstrap)
  10. 批量过滤删除AD中的对象