本篇转载自 灰寨小学–python小陈 若有侵权,立即删除。

Python多进程编程

序. multiprocessing
python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

1. Process

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

例1.1:创建函数并将其作为单个进程

  1. import multiprocessing
  2. import time
  3. def worker(interval):
  4. n = 5
  5. while n > 0:
  6. print("The time is {0}".format(time.ctime()))
  7. time.sleep(interval)
  8. n -= 1
  9. if __name__ == "__main__":
  10. p = multiprocessing.Process(target = worker, args = (3,))
  11. p.start()
  12. print "p.pid:", p.pid
  13. print "p.name:", p.name
  14. print "p.is_alive:", p.is_alive()

结果

p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

例1.2:创建函数并将其作为多个进程

  1. import multiprocessing
  2. import time
  3. def worker_1(interval):
  4. print "worker_1"
  5. time.sleep(interval)
  6. print "end worker_1"
  7. def worker_2(interval):
  8. print "worker_2"
  9. time.sleep(interval)
  10. print "end worker_2"
  11. def worker_3(interval):
  12. print "worker_3"
  13. time.sleep(interval)
  14. print "end worker_3"
  15. if __name__ == "__main__":
  16. p1 = multiprocessing.Process(target = worker_1, args = (2,))
  17. p2 = multiprocessing.Process(target = worker_2, args = (3,))
  18. p3 = multiprocessing.Process(target = worker_3, args = (4,))
  19. p1.start()
  20. p2.start()
  21. p3.start()
  22. print("The number of CPU is:" + str(multiprocessing.cpu_count()))
  23. for p in multiprocessing.active_children():
  24. print("child p.name:" + p.name + "\tp.id" + str(p.pid))
  25. print "END!!!!!!!!!!!!!!!!!"

结果

The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

例1.3:将进程定义为类

  1. import multiprocessing
  2. import time
  3. class ClockProcess(multiprocessing.Process):
  4. def __init__(self, interval):
  5. multiprocessing.Process.__init__(self)
  6. self.interval = interval
  7. def run(self):
  8. n = 5
  9. while n > 0:
  10. print("the time is {0}".format(time.ctime()))
  11. time.sleep(self.interval)
  12. n -= 1
  13. if __name__ == '__main__':
  14. p = ClockProcess(3)
  15. p.start()

注:进程p调用start()时,自动调用run()

结果

the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

例1.4:daemon程序对比结果

#1.4-1 不加daemon属性

  1. import multiprocessing
  2. import time
  3. def worker(interval):
  4. print("work start:{0}".format(time.ctime()));
  5. time.sleep(interval)
  6. print("work end:{0}".format(time.ctime()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target = worker, args = (3,))
  9. p.start()
  10. print "end!"

结果

end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

  1. import multiprocessing
  2. import time
  3. def worker(interval):
  4. print("work start:{0}".format(time.ctime()));
  5. time.sleep(interval)
  6. print("work end:{0}".format(time.ctime()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target = worker, args = (3,))
  9. p.daemon = True
  10. p.start()
  11. print "end!"

结果

end!

注:因子进程设置了daemon属性,主进程结束,它们就随着结束了。

#1.4-3 设置daemon执行完结束的方法

  1. import multiprocessing
  2. import time
  3. def worker(interval):
  4. print("work start:{0}".format(time.ctime()));
  5. time.sleep(interval)
  6. print("work end:{0}".format(time.ctime()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target = worker, args = (3,))
  9. p.daemon = True
  10. p.start()
  11. p.join()
  12. print "end!"

结果

work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

2. Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

  1. import multiprocessing
  2. import sys
  3. def worker_with(lock, f):
  4. with lock:
  5. fs = open(f, 'a+')
  6. n = 10
  7. while n > 1:
  8. fs.write("Lockd acquired via with\n")
  9. n -= 1
  10. fs.close()
  11. def worker_no_with(lock, f):
  12. lock.acquire()
  13. try:
  14. fs = open(f, 'a+')
  15. n = 10
  16. while n > 1:
  17. fs.write("Lock acquired directly\n")
  18. n -= 1
  19. fs.close()
  20. finally:
  21. lock.release()
  22. if __name__ == "__main__":
  23. lock = multiprocessing.Lock()
  24. f = "file.txt"
  25. w = multiprocessing.Process(target = worker_with, args=(lock, f))
  26. nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
  27. w.start()
  28. nw.start()
  29. print "end"

结果(输出文件)

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

3. Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

  1. import multiprocessing
  2. import time
  3. def worker(s, i):
  4. s.acquire()
  5. print(multiprocessing.current_process().name + "acquire");
  6. time.sleep(i)
  7. print(multiprocessing.current_process().name + "release\n");
  8. s.release()
  9. if __name__ == "__main__":
  10. s = multiprocessing.Semaphore(2)
  11. for i in range(5):
  12. p = multiprocessing.Process(target = worker, args=(s, i*2))
  13. p.start()

结果

Process-1acquire
Process-1releaseProcess-2acquire
Process-3acquire
Process-2releaseProcess-5acquire
Process-3releaseProcess-4acquire
Process-5releaseProcess-4release

4. Event

Event用来实现进程间同步通信。

  1. import multiprocessing
  2. import time
  3. def wait_for_event(e):
  4. print("wait_for_event: starting")
  5. e.wait()
  6. print("wairt_for_event: e.is_set()->" + str(e.is_set()))
  7. def wait_for_event_timeout(e, t):
  8. print("wait_for_event_timeout:starting")
  9. e.wait(t)
  10. print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
  11. if __name__ == "__main__":
  12. e = multiprocessing.Event()
  13. w1 = multiprocessing.Process(name = "block",
  14. target = wait_for_event,
  15. args = (e,))
  16. w2 = multiprocessing.Process(name = "non-block",
  17. target = wait_for_event_timeout,
  18. args = (e, 2))
  19. w1.start()
  20. w2.start()
  21. time.sleep(3)
  22. e.set()
  23. print("main: event is set")

结果

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

5. Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

  1. import multiprocessing
  2. def writer_proc(q):
  3. try:
  4. q.put(1, block = False)
  5. except:
  6. pass
  7. def reader_proc(q):
  8. try:
  9. print q.get(block = False)
  10. except:
  11. pass
  12. if __name__ == "__main__":
  13. q = multiprocessing.Queue()
  14. writer = multiprocessing.Process(target=writer_proc, args=(q,))
  15. writer.start()
  16. reader = multiprocessing.Process(target=reader_proc, args=(q,))
  17. reader.start()
  18. reader.join()
  19. writer.join()

结果

1

6. Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

  1. import multiprocessing
  2. import time
  3. def proc1(pipe):
  4. while True:
  5. for i in xrange(10000):
  6. print "send: %s" %(i)
  7. pipe.send(i)
  8. time.sleep(1)
  9. def proc2(pipe):
  10. while True:
  11. print "proc2 rev:", pipe.recv()
  12. time.sleep(1)
  13. def proc3(pipe):
  14. while True:
  15. print "PROC3 rev:", pipe.recv()
  16. time.sleep(1)
  17. if __name__ == "__main__":
  18. pipe = multiprocessing.Pipe()
  19. p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
  20. p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
  21. #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
  22. p1.start()
  23. p2.start()
  24. #p3.start()
  25. p1.join()
  26. p2.join()
  27. #p3.join()

结果

7. Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

例7.1:使用进程池(非阻塞)

  1. #coding: utf-8
  2. import multiprocessing
  3. import time
  4. def func(msg):
  5. print "msg:", msg
  6. time.sleep(3)
  7. print "end"
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes = 3)
  10. for i in xrange(4):
  11. msg = "hello %d" %(i)
  12. pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
  13. print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
  14. pool.close()
  15. pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
  16. print "Sub-process(es) done."

一次执行结果

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

例7.2:使用进程池(阻塞)

  1. #coding: utf-8
  2. import multiprocessing
  3. import time
  4. def func(msg):
  5. print "msg:", msg
  6. time.sleep(3)
  7. print "end"
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes = 3)
  10. for i in xrange(4):
  11. msg = "hello %d" %(i)
  12. pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
  13. print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
  14. pool.close()
  15. pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
  16. print "Sub-process(es) done."

一次执行的结果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.3:使用进程池,并关注结果

  1. import multiprocessing
  2. import time
  3. def func(msg):
  4. print "msg:", msg
  5. time.sleep(3)
  6. print "end"
  7. return "done" + msg
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes=4)
  10. result = []
  11. for i in xrange(3):
  12. msg = "hello %d" %(i)
  13. result.append(pool.apply_async(func, (msg, )))
  14. pool.close()
  15. pool.join()
  16. for res in result:
  17. print ":::", res.get()
  18. print "Sub-process(es) done."

一次执行结果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

例7.4:使用多个进程池

  1. #coding: utf-8
  2. import multiprocessing
  3. import os, time, random
  4. def Lee():
  5. print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
  6. start = time.time()
  7. time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
  8. end = time.time()
  9. print 'Task Lee, runs %0.2f seconds.' %(end - start)
  10. def Marlon():
  11. print "\nRun task Marlon-%s" %(os.getpid())
  12. start = time.time()
  13. time.sleep(random.random() * 40)
  14. end=time.time()
  15. print 'Task Marlon runs %0.2f seconds.' %(end - start)
  16. def Allen():
  17. print "\nRun task Allen-%s" %(os.getpid())
  18. start = time.time()
  19. time.sleep(random.random() * 30)
  20. end = time.time()
  21. print 'Task Allen runs %0.2f seconds.' %(end - start)
  22. def Frank():
  23. print "\nRun task Frank-%s" %(os.getpid())
  24. start = time.time()
  25. time.sleep(random.random() * 20)
  26. end = time.time()
  27. print 'Task Frank runs %0.2f seconds.' %(end - start)
  28. if __name__=='__main__':
  29. function_list= [Lee, Marlon, Allen, Frank]
  30. print "parent process %s" %(os.getpid())
  31. pool=multiprocessing.Pool(4)
  32. for func in function_list:
  33. pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
  34. print 'Waiting for all subprocesses done...'
  35. pool.close()
  36. pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
  37. print 'All subprocesses done.'

一次执行结果

parent process 7704Waiting for all subprocesses done...
Run task Lee-6948Run task Marlon-2896Run task Allen-7304Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

Python 多进程相关推荐

  1. python多进程详解

    目录 python多进程 序.multiprocessing 一.Process process介绍 例1.1:创建函数并将其作为单个进程 例1.2:创建函数并将其作为多个进程 例1.3:将进程定义为 ...

  2. Python 多进程开发与多线程开发

    我们先来了解什么是进程? 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程.程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本:进程 ...

  3. python多进程_python语法:多进程

    之前在一篇文章中写过,有关于Python线程的问题: 是枝裕和:内:用python多线程同时处理大量文件​zhuanlan.zhihu.com 当然我的写的文章都是面对于一些初学者和python代码实 ...

  4. 取代Python多进程!伯克利开源分布式框架Ray

    Ray由伯克利开源,是一个用于并行计算和分布式Python开发的开源项目.本文将介绍如何使用Ray轻松构建可从笔记本电脑扩展到大型集群的应用程序. 并行和分布式计算是现代应用程序的主要内容.我们需要利 ...

  5. python 多进程_说说Python多线程与多进程的区别?

    公众号新增加了一个栏目,就是每天给大家解答一道Python常见的面试题,反正每天不贪多,一天一题,正好合适,只希望这个面试栏目,给那些正在准备面试的同学,提供一点点帮助! 小猿会从最基础的面试题开始, ...

  6. python多进程存储数据_Python 多进程及进程间通信

    python 因 GIL 的存在,处理计算密集型的任务时无法高效利用多核 CPU 的计算资源,这时就需要使用多进程来提高对 CPU 的资源利用.Python 多进程主要用 multiprocessin ...

  7. python 多进程异常处理

    前言 最近项目用到了Python作为网站的前端,使用的框架是基于线程池的Cherrypy,但是前端依然有一些比较'重'的模块.由于python的多线程无法很好的利用多核的性质,所以觉得把这些比较'重' ...

  8. python多进程断点续传分片下载器

    python多进程断点续传分片下载器 标签:python 下载器 多进程 因为爬虫要用到下载器,但是直接用urllib下载很慢,所以找了很久终于找到一个让我欣喜的下载器.他能够断点续传分片下载,极大提 ...

  9. Python 多进程、多线程启动

    本文仅供学习交流使用,如侵立删!demo下载见文末 Python 多进程启动 def main(self, num):"""多进程启动ValueError: Pool n ...

  10. Python 多进程的进程池pool运行时报错:ValueError: Pool not running

    本文仅供学习交流使用,如侵立删!demo下载见文末 Python 多进程的进程池pool运行时报错:ValueError: Pool not running def main(self, num):& ...

最新文章

  1. c语言指着与数组,C语言指针与数组
  2. POJ2891 Strange Way to Express Integers (扩展欧几里德)
  3. 软件测试准备(摘要)
  4. 说说设计模式~桥梁模式(Bridge)
  5. 【优化预测】基于matlab遗传算法优化BP神经网络预测【含Matlab源码 1376期】
  6. ArcGIS地理数据处理高级教程_004_1遇到没有地理参照系信息的数据怎么办
  7. SocksCap64全局代理工具使用+Clash使用命令行
  8. 华为手机卡在升级界面_华为安装升级包卡在5 华为手机如何刷机?
  9. UE4 蓝图实现AI随机移动
  10. dht11传感器c语言程序,树莓派 DHT11 温湿度传感器读取 C 语言版
  11. win10计算机内存,win10系统电脑怎么升级内存?win10系统升级内存的方法
  12. fprintf()函数相关说明
  13. 2020ICPC上海 E.The Journey of Geor Autumn
  14. 关于H5版本及说明-为什么优雅草YYC蜻蜓系统H5版本打包不成功以及相关问题
  15. win10无法启动_常见的电脑故障分析:win10系统无法启动,造成的原因及解决教程...
  16. SSM框架项目:米米商城后台管理系统
  17. 华友高科激光SLAM导航系统整车调试方法(一)
  18. Android 10 系统屏蔽底部按键 禁止锁屏 禁用横屏
  19. Magpie免费开源、支持多平台的的滚动抽奖系统
  20. 6大B2C购物系统比较

热门文章

  1. 智能媒体管理产品文档转换/预览功能介绍(1)---Cloud Native架构
  2. 遗传算法(确定性排挤)
  3. 大数据东风下,Clickhouse这坨屎是怎么上天的
  4. 计算机主机usb插口松,usb插口(电脑usb接口松动小妙招)
  5. python word保存图_Python 将本地图片存储到 Word 文档
  6. Deep Knowledge Tracking based on Attention Mechanism for Student Performance Prediction
  7. 王者荣耀进阶教学攻速/移速/减伤机制/视野/意识
  8. 告诉你四川火锅的秘密配方
  9. Python统计模型探索性数据分析(EDA)系统(单变量-双变量-相关性-缺失值)
  10. java json去掉反斜杠,如何去掉Json字符串中反斜杠