Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。

传统的例子

简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:

#Example.py

'''

Standard Producer/Consumer Threading Pattern

'''

import time

import threading

import Queue

class Consumer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self._queue = queue

def run(self):

while True:

# queue.get() blocks the current thread until

# an item is retrieved.

msg = self._queue.get()

# Checks if the current message is

# the "Poison Pill"

if isinstance(msg, str) and msg == 'quit':

# if so, exists the loop

break

# "Processes" (or in our case, prints) the queue item

print "I'm a thread, and I received %s!!" % msg

# Always be friendly!

print 'Bye byes!'

def Producer():

# Queue is used to share items between

# the threads.

queue = Queue.Queue()

# Create an instance of the worker

worker = Consumer(queue)

# start calls the internal run() method to

# kick off the thread

worker.start()

# variable to keep track of when we started

start_time = time.time()

# While under 5 seconds..

while time.time() - start_time < 5:

# "Produce" a piece of work and stick it in

# the queue for the Consumer to process

queue.put('something at %s' % time.time())

# Sleep a bit just to avoid an absurd number of messages

time.sleep(1)

# This the "poison pill" method of killing a thread.

queue.put('quit')

# wait for the thread to close down

worker.join()

if __name__ == '__main__':

Producer()

哈,看起来有些像 Java 不是吗?

我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。

问题在于…

首先,你需要一个样板类;

其次,你需要一个队列来传递对象;

而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。

worker 越多,问题越多

按照这一思路,你现在需要一个 worker 线程的线程池。下面是 一篇 IBM 经典教程 中的例子——在进行网页检索时通过多线程进行加速。

#Example2.py

'''

A more realistic thread pool example

'''

import time

import threading

import Queue

import urllib2

class Consumer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self._queue = queue

def run(self):

while True:

content = self._queue.get()

if isinstance(content, str) and content == 'quit':

break

response = urllib2.urlopen(content)

print 'Bye byes!'

def Producer():

urls = [

'http://www.python.org', 'http://www.yahoo.com'

'http://www.scala.org', 'http://www.google.com'

# etc..

]

queue = Queue.Queue()

worker_threads = build_worker_pool(queue, 4)

start_time = time.time()

# Add the urls to process

for url in urls:

queue.put(url)

# Add the poison pillv

for worker in worker_threads:

queue.put('quit')

for worker in worker_threads:

worker.join()

print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):

workers = []

for _ in range(size):

worker = Consumer(queue)

worker.start()

workers.append(worker)

return workers

if __name__ == '__main__':

Producer()

这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……

至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。

何不试试 map

map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。

urls = ['http://www.yahoo.com', 'http://www.reddit.com']

results = map(urllib2.urlopen, urls)

上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:

results = []

for url in urls:

results.append(urllib2.urlopen(url))

map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。

为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy.

这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:”嘛,有这么个东西,你知道就成.”相信我,这个库被严重低估了!

dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。

所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。

动手尝试

使用下面的两行代码来引用包含并行化 map 函数的库:

from multiprocessing import Pool

from multiprocessing.dummy import Pool as ThreadPool

实例化 Pool 对象:

pool = ThreadPool()

这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。

Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

import urllib2

from multiprocessing.dummy import Pool as ThreadPool

urls = [

'http://www.python.org',

'http://www.python.org/about/',

'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',

'http://www.python.org/doc/',

'http://www.python.org/download/',

'http://www.python.org/getit/',

'http://www.python.org/community/',

'https://wiki.python.org/moin/',

'http://planet.python.org/',

'https://wiki.python.org/moin/LocalUserGroups',

'http://www.python.org/psf/',

'http://docs.python.org/devguide/',

'http://www.python.org/community/awards/'

# etc..

]

# Make the Pool of workers

pool = ThreadPool(4)

# Open the urls in their own threads

# and return the results

results = pool.map(urllib2.urlopen, urls)

#close the pool and wait for the work to finish

pool.close()

pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

# results = []

# for url in urls:

#   result = urllib2.urlopen(url)

#   results.append(result)

# # ------- VERSUS ------- #

# # ------- 4 Pool ------- #

# pool = ThreadPool(4)

# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- #

# pool = ThreadPool(8)

# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- #

# pool = ThreadPool(13)

# results = pool.map(urllib2.urlopen, urls)

结果:

#        Single thread:  14.4 Seconds

#               4 Pool:   3.1 Seconds

#               8 Pool:   1.4 Seconds

#              13 Pool:   1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

另一个真实的例子

生成上千张图片的缩略图

这是一个 CPU 密集型的任务,并且十分适合进行并行化。

基础单进程版本

import os

import PIL

from multiprocessing import Pool

from PIL import Image

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):

return (os.path.join(folder, f)

for f in os.listdir(folder)

if 'jpeg' in f)

def create_thumbnail(filename):

im = Image.open(filename)

im.thumbnail(SIZE, Image.ANTIALIAS)

base, fname = os.path.split(filename)

save_path = os.path.join(base, SAVE_DIRECTORY, fname)

im.save(save_path)

if __name__ == '__main__':

folder = os.path.abspath(

'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

for image in images:

create_thumbnail(Image)

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

如果我们使用 map 函数来代替 for 循环:

import os

import PIL

from multiprocessing import Pool

from PIL import Image

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):

return (os.path.join(folder, f)

for f in os.listdir(folder)

if 'jpeg' in f)

def create_thumbnail(filename):

im = Image.open(filename)

im.thumbnail(SIZE, Image.ANTIALIAS)

base, fname = os.path.split(filename)

save_path = os.path.join(base, SAVE_DIRECTORY, fname)

im.save(save_path)

if __name__ == '__main__':

folder = os.path.abspath(

'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

pool = Pool()

pool.map(creat_thumbnail, images)

pool.close()

pool.join()

5.6 秒!

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

原文链接

转载于:https://www.cnblogs.com/276815076/p/5530420.html

Python黑魔法,一行实现并行化相关推荐

  1. 我发现了个Python黑魔法,执行任意代码都会自动念上一段「平安经」

    来源 | Python编程时光 最近的"平安经"可谓是引起了不小的风波啊. 作为一个正儿八经的程序员,最害怕的就是自己的代码上线出现各种各样的 BUG. 为此,明哥今天分享一个 P ...

  2. Python 黑魔法 --- 描述器(descriptor)

    Python 黑魔法-描述器(descriptor) Python黑魔法,前面已经介绍了两个魔法,装饰器和迭代器,通常还有个生成器.生成器固然也是一个很优雅的魔法.生成器更像是函数的行为.而连接类行为 ...

  3. Python:一行代码将以e为结尾的科学计算法类型的数值转为小数点类型数值

    Python:一行代码将以e为结尾的科学计算法类型的数值转为小数点类型数值 目录 一行代码将以e为结尾的科学计算法类型的数值转为小数点类型数值 一行代码将以e为结尾的科学计算法类型的数值转为小数点类型 ...

  4. python多线程为啥是假的?(GIL 全局解释器锁)(python多线程不适合并行化的计算密集型代码)

    1. 问: 答: 2. from threading import Thread ​ def loop(): ​while True: ​print("亲爱的,我错了,我能吃饭了吗?&quo ...

  5. 教大家python读取一行一行文件内容的方法

    Python中readline()每次读取文件中的一行,需要使用永真表达式循环读取文件.但当文件指针移动到文件的末尾时,依然使用readline()读取文件将出现错误.因此程序中需要添加1个判断语句, ...

  6. 分行打印列表python_#python版一行内容分行输出

    python版一行内容分行输出 1.[代码][Python]代码236091543 #python版一行内容分行输出 #依山居 18:14 2015/11/4 #题目来源 http://www.bat ...

  7. python任意代码都可以缩进去_我发现了个 Python 黑魔法,执行任意代码都会自动念上一段 『平安经』...

    最近的"平安经"可谓是引起了不小的风波啊. 作为一个正儿八经的程序员,最害怕的就是自己的代码上线出现各种各样的 BUG. 为此,明哥今天分享一个 Python 的黑魔法,教你如何在 ...

  8. 我发现了个 Python 黑魔法,执行任意代码都会自动念上一段「平安经」

    来源 | Python编程时光 最近的"平安经"可谓是引起了不小的风波啊. 作为一个正儿八经的程序员,最害怕的就是自己的代码上线出现各种各样的 BUG. 为此,明哥今天分享一个 P ...

  9. [转载] 如何用python实现一行两个输入

    参考链接: 如何在Python的一行中从用户输入多个值 前几天突发奇想要用python做SDNU OJ的题,就是最简单的那道1001,然而错了==原因就在于命题要求在同一行中输入,而我的代码用了两个i ...

  10. python中有这样一条语句_在Python中一行书写两条语句时,语句之间可以使用__________作为分隔符。_学小易找答案...

    [填空题]Python表达式4.5/2.4.5//2和4.5%2的值分别为__________________________. [填空题]我国古代文献中所记载的最早的学校类型有两种,分别是 和 . ...

最新文章

  1. 外包三年,我废了..…
  2. 从博客园博问站点迁移ASP.NET Core展望.NET Core
  3. Linux设置SSH链接
  4. 如何将CSV数据存储到Hive
  5. 背包问题 小灰_小背包问题
  6. 学会使用ant design封装一个锚点组件
  7. 在Saas发展的黄金时代里带你理解SaaS设计
  8. python和nltk自然语言处理书评_Python和NLTK自然语言处理
  9. 那时我大约5岁的飞鸽传书
  10. Siri为什么越来越蠢?
  11. 使用联想恢复盘安装正版Win7 Professional
  12. Matlab无法打开
  13. Windows10电脑进行拨号连接时,无法共享热点?如何解决?多次尝试后,终于解决了。
  14. oracle rac定时清理归档日志,Rman 定时删除归档日志
  15. 怎么在计算机网络上添加文件,教你win7如何设置网络共享文件夹
  16. QPushButton如何设置按下效果
  17. 下载神器annie的安装及使用
  18. Wirecast Pro 11 Mac(直播软件) v11.0.0中文破解版
  19. CAD框选对象的两种方式、AUTOCAD——删除重复线段
  20. 使用ffmpeg处理 视频文件中帧间时间戳异常、关键帧间隔异常

热门文章

  1. windows功能_你的Windows杀毒软件有这个功能吗?
  2. html标签转换日期格式,input标签的type为date,显示的日期格式样式更改
  3. yolov3安卓实现_YOLOv3 的 TensorFlow 实现,GitHub 完整源码解析
  4. 北科大计算机技术研招考纲,北京科技大学2021年全国硕士研究生招生考试自命题科目考试大纲...
  5. 用c语言产生大素数,C语言实现寻找大素数
  6. php登陆页面修改密码的功能,使用bootstrap创建登录注册页面并实现表单验证功能...
  7. java递归api_javaAPI_IO流基础_递归使用
  8. 2台电脑一根网线传文件_Iphone 和PC如何共享文件
  9. pat 乙级 1032 挖掘机技术哪家强(C++)
  10. 【渝粤题库】陕西师范大学200701 数字逻辑