参考:http://blog.csdn.net/qdx411324962/article/details/46810421
参考:http://www.lxway.com/4488626156.htm

廖雪峰官网 进程和线程、多进程、多线程、ThreadLocal、进程 vs. 线程、分布式进程

multiprocessing 文档: https://docs.python.org/2/library/multiprocessing.html#managers
Python 中的进程、线程、协程、同步、异步、回调:https://segmentfault.com/a/1190000001813992
multiprocessing 多进程的用法 :https://cuiqingcai.com/3335.html

由于要做把一个多线程改成多进程,看一下相关方面的东西,总结一下,主要是以下几个相关的标准库

  1. subprocess
  2. signal
  3. threading
  4. multiprocessing

从 Python3.2 开始,标准库提供了concurrent.futures 模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的更高级的抽象,对编写 线程池/进程池 提供了直接的支持。 
concurrent.futures 基础模块是 executor 和 future。

concurrent.futures 官方文档:https://docs.python.org/3/library/concurrent.futures.html

Python3 模块 - Concurrent.futures 教程:
https://www.yiibai.com/concurrency_in_python/concurrency_in_python_pool_of_processes.html

使用示例代码:

# -*- coding: utf-8 -*-import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 减库存函数, 循环直到减库存完成
# 库存充足, 减库存成功, 返回True
# 库存不足, 减库存失败, 返回Falsedef reduce_stock():# python中redis事务是通过pipeline的封装实现的with r.pipeline() as pipe:while True:try:# watch库存键, multi后如果该key被其他客户端改变, # 事务操作会抛出WatchError异常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0:  # 有库存# 事务开始pipe.multi()pipe.decr('stock:count')# 把命令推送过去# execute返回命令执行结果列表, 这里只有一个decr返回当前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError异常, 观察被watch锁住的情况print(ex)pipe.unwatch()def worker():while True:# 没有库存就退出if not reduce_stock():breakif __name__ == "__main__":# 设置库存为100r.set("stock:count", 100)# 多进程模拟多个客户端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)

python 单线程 和 多线程

python 单线程

# -*- coding:utf-8 -*-import time
import datetimedef music(argv):for i in range(2):print("听音乐  %s. %s" % (argv, datetime.datetime.now()))time.sleep(1)def movie(argv):for i in range(2):print("看电影  {}. {}".format(argv, datetime.datetime.now()))time.sleep(5)if __name__ == '__main__':music('trouble is a friend')movie('变形金刚')print(f"all over {datetime.datetime.now()}")

python 多线程

Python 中使用线程有两种方式:函数 或者 用类来包装线程对象

  • 创建多线程:使用 函数 方法。
            函数式  :调用 thread 模块中的 start_new_thread() 函数来产生新线程。
            语法如下: thread.start_new_thread(function, args[, kwargs])
            参数说明:
                function  线程函数。
                args      传递给线程函数的参数,必须是个 tuple 类型。
                kwargs    可选参数。

使用示例:( Python2 代码 )

import thread
import timedef print_time(thread_name, delay):count = 0while count < 5:time.sleep(delay)count += 1print "%s: %s" % (thread_name, time.ctime(time.time()))if __name__ == "__main__":try:thread.start_new_thread(print_time, ("Thread-1", 2))thread.start_new_thread(print_time, ("Thread-2", 4))except BaseException as e:print eprint "Error: unable to start thread"while 1:pass
  • 创建多线程:通过 类继承 。使用 Threading 模块创建线程,直接从 threading.Thread 继承,然后重写 __init__ 方法和 run 方法:
import threading
import time
import _threadexitFlag = 0  # 是否每个线程要进行工作后再退出,设定1则所有线程启动后直接退出class MyThread(threading.Thread):  # 继承父类 threading.Threaddef __init__(self, thread_id, name, counter):super().__init__()  #self.thread_id = thread_idself.name = nameself.counter = counterdef run(self):# 把要执行的代码写到 run 函数里面线程在创建后会直接运行 run 函数print("Starting " + self.name)print_time(self.name, 5, self.counter)print("Exiting " + self.name)def print_time(thread_name, delay, counter):while counter:if exitFlag:_thread.exit()  # 这个是让线程主动退出time.sleep(delay)print("%s: %s" % (thread_name, time.ctime(time.time())))counter -= 1# 创建新线程
thread_1 = MyThread(1, "Thread-1", 1)
thread_2 = MyThread(2, "Thread-2", 2)
# 开启线程
thread_1.start()
thread_2.start()
print("Exiting Main Thread")
  • thread 和 threading 模块( 强烈建议直接使用 threading )。python 提供了两个模块来实现多线程 thread 和 threading 。 thread 有一些缺点,在 threading 得到了弥补,但是还是强烈建议直接使用 threading。
# -*- coding: utf-8 -*-import threading
from time import ctime, sleepdef music(argv):for i in range(2):print("listen music  %s. %s" % (argv, ctime()))sleep(1)def movie(argv):for i in range(2):print("watch movie  %s! %s" % (argv, ctime()))sleep(5)threads = []
t1 = threading.Thread(target=music, args=('trouble is a friend',))
threads.append(t1)
t2 = threading.Thread(target=movie, args=('变形金刚',))
threads.append(t2)if __name__ == '__main__':for t in threads:t.setDaemon(True)t.start()print("all over %s" % ctime())pass
  • 设置 精灵进程  setDaemon(True) 将线程声明为守护线程,必须在 start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句 print "all over %s" %ctime()后,没有等待子线程,直接就退出了,同时子线程也一同结束。
if __name__ == '__main__':for t in threads:t.setDaemon(True)t.start()   # start()开始线程活动。t.join()print "all over %s" %ctime()
  • join() 方法,用于等待线程终止。join() 的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。注意:上面程序中 join() 方法的位置是在 for 循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。
import threading
import timedef worker(num):time.sleep(1)print("The num is  %d" % num)print t.getName()returnfor i in range(20):t = threading.Thread(target=worker, args=(i,), name="testThread")t.start()

Thread 方法说明

t.start()       激活线程,
t.getName()     获取线程的名称
t.setName()     设置线程的名称
t.name          获取或设置线程的名称
t.is_alive()    判断线程是否为激活状态
t.isAlive()     判断线程是否为激活状态
t.setDaemon()   设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon()    判断是否为守护线程
t.ident         获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join()        逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run()         线程被cpu调度后自动执行线程对象的run方法

线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

线程锁

        多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。考虑这样一种情况:一个列表里所有元素都是 0,线程 "set" 从后向前把所有元素改成1,而线程 "print" 负责从前往后读取列表并打印。那么,可能线程 "set" 开始改的时候,线程 "print" 便来打印列表了,输出就成了一半 0 一半 1,这就是 数据的不同步。
为了避免这种情况,引入了
的概念。锁有两种状态:锁定 未锁定
每当一个线程比如 "set" 要访问共享数据时,必须先获得锁定;如果已经有别的线程比如 "print" 获得锁定了,那么就让线程 "set" 暂停,也就是同步阻塞;等到线程 "print" 访问完毕,释放锁以后,再让线程 "set" 继续。经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。

使用 Thread对象LockRlock 可以实现简单的线程同步,这两个对象都有 acquire方法release方法。对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和release 方法之间。

threading.RLockthreading.Lock 的区别:

  • RLock 允许在同一线程中被多次 acquire。而 Lock 却不允许这种情况。使用 RLock 时 acquire 和 release 必须成对出现,即调用了 n 次 acquire,必须调用 n 次的release 才能真正释放所占用的琐。
# -*- coding: utf-8 -*-import threadinglock = threading.Lock()    # Lock对象
rLock = threading.RLock()  # RLock对象 def main_1():lock.acquire()lock.acquire()  # 产生了死琐。lock.release()lock.release()def main_2():    rLock.acquire()rLock.acquire()  # 在同一线程内,程序不会堵塞。 rLock.release()rLock.release()

示例:( 线程锁

# -*- coding: utf-8 -*-import time
import threading# 定义一个 "线程锁"
threadLock = threading.Lock()class MyThread(threading.Thread):def __init__(self, thread_id, name, counter):threading.Thread.__init__(self)self.thread_id = thread_idself.name = nameself.counter = counterdef run(self):print("Starting " + self.name)# 获得锁,成功获得锁定后返回 True# 可选的 timeout 参数不填时将一直阻塞直到获得锁定# 否则超时后将返回 FalsethreadLock.acquire()print_time(self.name, self.counter, 3)# 释放锁threadLock.release()def print_time(thread_name, delay, counter):while counter:time.sleep(delay)print("%s: %s" % (thread_name, time.ctime(time.time())))counter -= 1threads = []
# 创建新线程
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表中
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:t.join()
print("Exiting Main Thread")

示例:

import threading
import time
globals_num = 0
lock = threading.RLock()def func():lock.acquire()  # 获得锁global globals_numglobals_num += 1time.sleep(1)print(globals_num)lock.release()  # 释放锁for i in range(10):t = threading.Thread(target=func)t.start()pass

Python 的 queue ( 线程安全 )

Python 的 queue 模块中提供了同步的、线程安全的队列类。包括

  • FIFO(先入先出) 队列
  • LIFO(后入先出)队列
  • 优先级队列 PriorityQueue

这些队列都实现了 锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

Queue 模块中的常用方法:

Queue.qsize()    返回队列的大小
Queue.empty()    如果队列为空,返回True,反之False
Queue.full()     如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]]) 获取队列,timeout是等待时间
Queue.get_nowait()            相当Queue.get(False)
Queue.put(item)               写入队列,timeout是等待时间
Queue.put_nowait(item)        相当Queue.put(item, False)
Queue.task_done()             在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join()                  实际上意味着等到队列为空,再执行别的操作

示例:

# -*- coding: utf-8 -*-import time
import queue
import threadingtask_queue = queue.Queue()def produce():while True:for num in range(100):task_queue.put(num)time.sleep(0.1)def consume():while True:if task_queue.empty():print('队列为空')continuenum = task_queue.get()print(num)time.sleep(1)if __name__ == '__main__':thread_list = []t1 = threading.Thread(target=produce)thread_list.append(t1)for i in range(3):t_id = threading.Thread(target=consume)thread_list.append(t_id)for index in thread_list:index.start()for index in thread_list:index.join()

queue 是线程安全的,这是为了演示,给 queue 加锁

# -*- coding: utf-8 -*-import queue
import threading
import timeexitFlag = 0
threading_lock = threading.Lock()
workQueue = queue.Queue(10)class MyThread(threading.Thread):def __init__(self, thread_id, name, q):threading.Thread.__init__(self)self.thread_id = thread_idself.name = nameself.q = qdef run(self):print("Starting " + self.name)process_data(self.name, self.q)print("Exiting " + self.name)def process_data(thread_name, q):while not exitFlag:threading_lock.acquire()if not workQueue.empty():data = q.get()threading_lock.release()print("%s processing %s" % (thread_name, data))else:threading_lock.release()time.sleep(1)def main():thread_list = ["Thread-1", "Thread-2", "Thread-3"]name_list = ["One", "Two", "Three", "Four", "Five"]threads = []thread_id = 1# 创建线程for tName in thread_list:thread = MyThread(thread_id, tName, workQueue)thread.start()threads.append(thread)thread_id += 1# 填充队列threading_lock.acquire()for word in name_list:workQueue.put(word)threading_lock.release()# 等待队列清空while not workQueue.empty():pass# 通知线程退出exitFlag = 1# 等待所有线程完成for t in threads:t.join()print("Exiting Main Thread")if __name__ == '__main__':main()pass

threading.Condition

一个 condition 变量总是与某些类型的锁相联系,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁 是 conditon 对象的一部分:没有必要分别跟踪。

Condition 类实现了一个 conditon 变量。这个 conditiaon 变量允许一个或多个线程等待,直到他们被另一个线程通知。

  • 如果 lock 参数非空,那么他必须是一个 lock 或者 Rlock 对象,它用来做底层锁。
  • 如果 lock 参数为空,则会创建一个新的 Rlock 对象,用来做底层锁。

condition 变量服从上下文管理协议:with 语句块封闭之前可以获取与锁的联系。
acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,
当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
wait(timeout=None) :等待通知,或者等到设定的超时时间。
当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError异常。
wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。
wait()还可以指定一个超时时间。 如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。
除非线程调用notify()和notify_all()之后放弃了锁的所有权。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。
修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。

例子:生产者-消费者模型

import threading
import timedef consumer(cond):with cond:print("consumer before wait")cond.wait()print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notifyAll()print("producer after notifyAll")condition = threading.Condition()
consumer_1 = threading.Thread(name="c1", target=consumer, args=(condition,))
consumer_2 = threading.Thread(name="c2", target=consumer, args=(condition,))
producer = threading.Thread(name="p", target=producer, args=(condition,))consumer_1.start()
time.sleep(2)
consumer_2.start()
time.sleep(2)
producer.start()

python 多进程共享变量

https://my.oschina.net/leejun2005/blog/203148

共享内存 (Shared memory)

Data can be stored in a shared memory map using Value or Array.

For example, the following code.   https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。如果你真有需要要共享数据, multiprocessing提供了两种方式。

multiprocessing 中的 Array 和 Value。数据可以用 Value 或 Array 存储在一个共享内存地图里,如下:

from multiprocessing import Array, Value, Processdef func(a, b):a.value = 3.333333333333333for j in range(len(b)):b[j] = -b[j]if __name__ == "__main__":num = Value('d', 0.0)arr = Array('i', range(11))if 0:t = Process(target=func, args=(num, arr))t.start()t.join()else:c = Process(target=func, args=(num, arr))d = Process(target=func, args=(num, arr))c.start()d.start()c.join()d.join()print(num.value)print(arr[:])for i in arr:print i,

输出

3.33333333333
0 1 2 3 4 5 6 7 8 9 10

创建 num 和 arr 时,“d”和“i”参数 由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。

Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char
‘u’: ctypes.c_wchar
‘b’: ctypes.c_byte
‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short
‘H’: ctypes.c_ushort
‘i’: ctypes.c_int
‘I’: ctypes.c_uint
‘l’: ctypes.c_long,
‘L’: ctypes.c_ulong
‘f’: ctypes.c_float
‘d’: ctypes.c_double

Server process

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.

https://docs.python.org/2/library/multiprocessing.html#managers

multiprocessing 中的 Manager()

Python中进程间共享数据,除了基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

from multiprocessing import Process, Managerdef f(d, l):d["name"] = "king"d["age"] = 100d["Job"] = "python"l.reverse()if __name__ == "__main__":with Manager() as man:d_temp = man.dict()l_temp = man.list(range(10))p = Process(target=f, args=(d_temp, l_temp))p.start()p.join()print(d_temp)print(l_temp) 

Server process manager 比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。

threading.Event

python 线程的事件用于主线程控制其他线程的执行。事件主要提供了三个方法

  • set 。将 “Flag” 设置为 False
  • wait 。将 “Flag” 设置为 True
  • clear 。判断标识位是否为Ture。

事件处理的机制:全局定义了一个 “Flag”,如果 “Flag” 值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果 “Flag” 值为True,那么 event.wait 方法时便不再阻塞。

import threadingdef do(event):print('start')event.wait()print('execute')event_obj = threading.Event()
for i in range(10):t = threading.Thread(target=do, args=(event_obj,))t.start()event_obj.clear()
# inp = input('input:')
inp = raw_input('input:')
if inp == 'true':event_obj.set()

当线程执行的时候,如果 flag 为False,则线程会阻塞,当 flag 为True 的时候,线程不会阻塞。它提供了 本地 和 远程 的并发性。

python 协程

关于协程,可以参考 greenlet、stackless、gevent、eventlet 等的实现。

我们知道并发(不是并行)编程目前有四种方式,多进程,多线程,异步,和协程。

多进程编程在 python 中有类似 C 的 os.fork,当然还有更高层封装的 multiprocessing 标准库,在之前写过的python高可用程序设计方法  http://www.cnblogs.com/hymenz/p/3488837.html  中提供了类似nginx中master process和worker process间信号处理的方式,保证了业务进程的退出可以被主进程感知。

多线程编程 Python 中有 Thread 和 threading,在 linux 下所谓的线程,实际上是 LWP 轻量级进程,其在内核中具有和进程相同的调度方式,有关 LWP,COW(写时拷贝),fork,vfork,clone等的资料较多,这里不再赘述。异步在 linux 下主要有三种实现 select,poll,epoll 。

协程 又称 微线程 。英文名 Coroutine。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

"函数 ( 又叫 子程序 ) " 在所有语言中都是层级调用,比如 A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

  • 子程序调用总是一个入口,一次返回,调用顺序是明确的。
  • 协程的调用 和 子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。注意:是在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。

比如:子程序 A、B:

def A():print '1'print '2'print '3'def B():print 'x'print 'y'print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B 也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来 A、B 的执行有点像多线程,但协程的特点在于是一个线程执行,

协程和多线程比,协程有何优势?

  • 协程最大的优势就是协程极高的执行效率。因为是子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 第二大优势就是不需要多线程的锁机制。因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢 ?

  • 多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

一个例子:

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

import timedef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):c.next()n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()if __name__=='__main__':c = consumer()produce(c)

执行结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
        1. 首先调用c.next()启动生成器;
        2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
        3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
        4. produce拿到consumer处理的结果,继续生产下一条消息;
        5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

最后套用Donald Knuth的一句话总结协程的特点:“子程序就是协程的一种特例

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),
event loop是协程执行的控制点,如果你希望执行协程,就需要用到它们。
event loop提供了如下的特性:

注册、执行、取消延时调用(异步函数)、创建用于通信的client和server协议(工具)、创建和别的程序通信的子进程和协议(工具) 把函数调用送入线程池中

协程示例:

#---------python3_start---------------
import asyncio
async def cor1():print("COR1 start")await cor2()print("COR1 end")async def cor2():print("COR2")loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()
#---------python3_end---------------

最后三行是重点。
        asyncio.get_event_loop() : asyncio启动默认的event loop
        run_until_complete() :         这个函数是阻塞执行的,知道所有的异步函数执行完成,
        close() :                                 关闭 event loop。

python 的 greenlet 模块

import greenlet
def fun1():print("12")gr2.switch()print("56")gr2.switch()def fun2():print("34")gr1.switch()print("78")gr1 = greenlet.greenlet(fun1)
gr2 = greenlet.greenlet(fun2)
gr1.switch()

gevent

gevent 属于第三方模块需要下载安装包
pip3 install --upgrade pip3
pip3 install gevent

import gevent
def fun1():print("www.baidu.com")  # 第一步gevent.sleep(0)print("end the baidu.com")  # 第三步def fun2():print("www.zhihu.com")  # 第二步gevent.sleep(0)print("end th zhihu.com")  # 第四步gevent.joinall([gevent.spawn(fun1),gevent.spawn(fun2),
])

遇到 IO 操作自动切换:

import gevent
import requestsdef func(url):print("get: %s" % url)gevent.sleep(0)proxies = {"http": "http://172.17.18.80:8080","https": "http://172.17.18.80:8080",}date = requests.get(url, proxies=proxies)ret = date.textprint(url, len(ret))gevent.joinall([gevent.spawn(func, 'https://www.baidu.com/'),gevent.spawn(func, 'http://www.sina.com.cn/'),gevent.spawn(func, 'http://www.qq.com/'),
])

http://www.cnblogs.com/zingp/p/5911537.html

http://python.jobbole.com/87310/

http://www.cnblogs.com/gide/p/6187080.html

python中多进程+协程的使用以及为什么要用它: http://blog.csdn.net/lambert310/article/details/51162634

从两个简单例子窥视协程的惊人性能(Python):http://walkerqt.blog.51cto.com/1310630/1439034

greenlet:http://greenlet.readthedocs.org/en/latest/
eventlet: http://eventlet.net/
http://gashero.iteye.com/blog/442177

示例代码:

"""
对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.
Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。那么如何使用多个cpu核心?
可以利用多进程 mutliprocessing 来进行多核并行工作,
在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.废话少说,直接上个例子.
下面是多进程下生产者消费者的工作模式
"""from multiprocessing import Process, cpu_count, Queue, JoinableQueue
from gevent import monkeymonkey.patch_all()
import gevent
import datetimeclass Consumer(object):def __init__(self, q, no_tasks, name):self._no_tasks = no_tasksself._queue = qself.name = nameself._rungevent(self._queue, self._no_tasks)def _rungevent(self, q, no_tasks):jobs = [gevent.spawn(self._printq) for x in range(no_tasks)]gevent.joinall(jobs)def _printq(self):while 1:value = self._queue.get()if value is None:self._queue.task_done()breakelse:print("{0} time: {1}, value: {2}".format(self.name, datetime.datetime.now(), value))returnclass Producer(object):def __init__(self, q, no_tasks, name, consumers_tasks):print(name)self._q = qself._no_tasks = no_tasksself.name = nameself.consumer_tasks = consumers_tasksself._rungevent()def _rungevent(self):jobs = [gevent.spawn(self.produce) for x in range(self._no_tasks)]gevent.joinall(jobs)for x in range(self.consumer_tasks):self._q.put_nowait(None)self._q.close()def produce(self):for no in range(10000):print(no)self._q.put(no, block=False)returndef main():total_cores = cpu_count()total_processes = total_cores * 2q = JoinableQueue()print("Gevent on top multiprocessing with 17 gevent coroutines ""\n 10 producers gevent and 7 consumers gevent")producer_gevents = 10consumer_gevents = 7jobs = []start = datetime.datetime.now()for x in range(total_cores):if not x % 2:p = Process(target=Producer, args=(q, producer_gevents, "producer %d" % 1, consumer_gevents))p.start()jobs.append(p)else:p = Process(target=Consumer, args=(q, consumer_gevents, "consumer %d" % x))p.start()jobs.append(p)for job in jobs:job.join()print("{0} process with {1} producer gevents and {2} consumer gevents took{3}\seconds to produce {4} numbers and consume".format(total_processes,producer_gevents * total_cores,consumer_gevents * total_cores,datetime.datetime.now() - start,producer_gevents * total_cores * 10000))if __name__ == '__main__':main()

mutilprocess 简介

由于 Python 设计的限制 ( 这里指 CPython,GLI )。最多只能用满1个CPU核心。但是 Python 的多进程包 multiprocessing 可以轻松完成从单进程到并发执行的转换。像 线程一样管理进程,这个是 mutilprocess 的核心,他与 threading 很是相像,对多核CPU的利用率会比 threading 好的多。

简单的创建进程

import multiprocessingdef worker(num):"""thread worker function"""print 'Worker:', numreturnif __name__ == '__main__':jobs = []for i in range(5):p = multiprocessing.Process(target=worker, args=(i,))jobs.append(p)p.start()

示例:

# -*- coding: utf-8 -*-import time
import multiprocessingdef func(msg):for i in range(3):print(msg)time.sleep(1)if __name__ == "__main__":p = multiprocessing.Process(target=func, args=("hello",))p.start()p.join()print("Sub-process done.")

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing
import timedef worker():name = multiprocessing.current_process().nameprint(name, 'Starting')time.sleep(2)print(name, 'Exiting')def my_service():name = multiprocessing.current_process().nameprint(name, 'Starting')time.sleep(3)print(name, 'Exiting')if __name__ == '__main__':service = multiprocessing.Process(name='my_service', target=my_service)worker_1 = multiprocessing.Process(name='worker 1', target=worker)worker_2 = multiprocessing.Process(target=worker)  # default nameworker_1.start()worker_2.start()service.start()

使用 进程池(非阻塞、阻塞)

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用 apply_async,如果落下 async,就变成阻塞版本了。

使用 进程池( 非阻塞 版本)

# -*- coding: utf-8 -*-import multiprocessing
import timedef func(msg):for i in range(3):print(msg)time.sleep(1)if __name__ == "__main__":# processes=4 是最多并发进程数量。pool = multiprocessing.Pool(processes=4)for index in range(10):msg = f"hello {index}"pool.apply_async(func, (msg,))pool.close()pool.join()print("Sub-process(es) done.")

函数解释

  • apply_async(func[, args[, kwds[, callback]]]) 是非阻塞,apply(func[, args[, kwds]])是阻塞
  • close()    关闭 pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join 方法要在 close 或 terminate 之后使用。

示例:

# -*- coding: utf-8 -*-import os
import time
import random
import multiprocessingdef Lee():print("\nRun task Lee-%s" % (os.getpid()))  # os.getpid()获取当前的进程的IDstart = time.time()time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数end = time.time()print('Task Lee, runs %0.2f seconds.' % (end - start))def Marlon():print("\nRun task Marlon-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 40)end = time.time()print('Task Marlon runs %0.2f seconds.' % (end - start))def Allen():print("\nRun task Allen-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 30)end = time.time()print('Task Allen runs %0.2f seconds.' % (end - start))def Frank():print("\nRun task Frank-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 20)end = time.time()print('Task Frank runs %0.2f seconds.' % (end - start))if __name__ == '__main__':function_list = [Lee, Marlon, Allen, Frank]print("parent process %s" % (os.getpid()))pool = multiprocessing.Pool(4)for func in function_list:# Pool执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中pool.apply_async(func)print('Waiting for all subprocesses done...')pool.close()# 调用join之前,一定要先调用close() 函数,否则会出错# close()执行后不会有新的进程加入到 pool, join 函数等待素有子进程结束pool.join()print('All subprocesses done.')pass

使用 进程池( 阻塞 版本)

#coding: utf-8
import multiprocessing
import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print "Sub-process(es) done."

使用 Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

# -*- coding: utf-8 -*-import multiprocessing
import timedef func(msg):for i in range(3):print(msg)time.sleep(1)return "done " + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for index in range(10):msg = f"hello {index}"result.append(pool.apply_async(func, (msg,)))pool.close()pool.join()for res in result:print(res.get())print("Sub-process(es) done.")

示例

import multiprocessingdef do_calculation(data):return data * 2def start_process():print('Starting', multiprocessing.current_process().name)if __name__ == '__main__':inputs = list(range(10))print('Inputs  :', inputs)builtin_output = list(map(do_calculation, inputs))print('Build-In :', builtin_output)pool_size = multiprocessing.cpu_count() * 2pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, )# 默认情况下,Pool会创建固定数目的工作进程,并向这些工作进程传递作业,直到再没有更多作业为止。# maxtasksperchild 参数为每个进程执行 task 的最大数目,# 设置 maxtasksperchild参数可以告诉池在完成一定数量任务之后重新启动一个工作进程,# 来避免运行时间很长的工作进程消耗太多的系统资源。# pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2)print('-' * 20)pool_outputs = pool.map(do_calculation, inputs)pool.close()pool.join()print('Pool  :', pool_outputs)

multiprocessing 的 pool.map 使用

#coding: utf-8
import multiprocessing def m1(x): print x * x if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count()) i_list = range(8)pool.map(m1, i_list)

示例:

import numpy as np
from time import time
from multiprocessing import Process, Queue
import multiprocessing as mp
import randomdef my_func(x):s0 = time()res = 0for _ in range(x*1000000):res += 1print(mp.current_process(),'run time:%.3f s, result:%.1f'%(time()-s0,res))return res'''
multiprocessing.Pool 只是用来启动多个进程而不是在每个core上启动一个进程。
换句话说Python解释器本身不会去在每个core或者processor去做负载均衡。
这个是由操作系统决定的。如果你的工作特别的计算密集型的话,操作系统确实会分配更多的core,但这也不是Python或者代码所能控制的或指定的。
multiprocessing.Pool(num)中的num可以很小也可以很大,比如I/O密集型的操作,这个值完全可以大于cpu的个数。
硬件系统的资源分配是由操作系统决定的,如果你希望每个core都在工作,就需要更多的从操作系统出发了~
这段话转自https://segmentfault.com/q/1010000011117956
'''
def main():pool = mp.Pool(processes=mp.cpu_count())st = time()result = pool.map(my_func, [30]*8)print('total run time: %.3f s'%(time()-st))print(result)if __name__ == "__main__":main()

守护进程

守护进程就是不阻挡主程序退出,自己干自己的。 mutilprocess.setDaemon(True)就这句。

等待守护进程退出,要加上 join,join 可以传入浮点数值,等待n久就不等了

import multiprocessing
import time
import sysdef daemon():name = multiprocessing.current_process().nameprint('Starting:', name)time.sleep(2)print('Exiting :', name)def non_daemon():name = multiprocessing.current_process().nameprint('Starting:', name)print('Exiting :', name)if __name__ == '__main__':d = multiprocessing.Process(name='daemon',target=daemon)d.daemon = Truen = multiprocessing.Process(name='non-daemon',target=non_daemon)n.daemon = Falsed.start()n.start()d.join(1)print 'd.is_alive()', d.is_alive()n.join()

终止进程

最好使用 poison pill,强制的使用 terminate()。注意 terminate 之后要 join,使其可以更新状态

import multiprocessing
import timedef slow_worker():print('Starting worker')time.sleep(0.1)print('Finished worker')if __name__ == '__main__':p = multiprocessing.Process(target=slow_worker)print('BEFORE:', p, p.is_alive())p.start()print('DURING:', p, p.is_alive())p.terminate()print('TERMINATED:', p, p.is_alive())p.join()print('JOINED:', p, p.is_alive())

进程的退出状态

  • == 0     未生成任何错误
  • 0           进程有一个错误,并以该错误码退出
  • < 0       进程由一个-1 * exitcode信号结束
import multiprocessing
import sys
import timedef exit_error():sys.exit(1)def exit_ok():returndef return_value():return 1def raises():raise RuntimeError('There was an error!')def terminated():time.sleep(3)if __name__ == '__main__':jobs = []for f in [exit_error, exit_ok, return_value, raises, terminated]:print('Starting process for', f.func_name)j = multiprocessing.Process(target=f, name=f.func_name)jobs.append(j)j.start()jobs[-1].terminate()for j in jobs:j.join()print('%15s.exitcode = %s' % (j.name, j.exitcode))

日志

方便的调试,可以用logging

import multiprocessing
import logging
import sysdef worker():print 'Doing some work'sys.stdout.flush()if __name__ == '__main__':multiprocessing.log_to_stderr()logger = multiprocessing.get_logger()logger.setLevel(logging.INFO)p = multiprocessing.Process(target=worker)p.start()p.join()

派生进程

利用 class 来创建进程,定制子类

import multiprocessingclass Worker(multiprocessing.Process):def run(self):print('In %s' % self.name)returnif __name__ == '__main__':jobs = []for i in range(5):p = Worker()jobs.append(p)p.start()for j in jobs:j.join()

python 进程间传递消息

一般的情况是 Queue 来传递。

import multiprocessingclass MyFancyClass(object):def __init__(self, name):self.name = namedef do_something(self):proc_name = multiprocessing.current_process().nameprint 'Doing something fancy in %s for %s!' % \(proc_name, self.name)def worker(q):obj = q.get()obj.do_something()if __name__ == '__main__':queue = multiprocessing.Queue()p = multiprocessing.Process(target=worker, args=(queue,))p.start()queue.put(MyFancyClass('Fancy Dan'))# Wait for the worker to finishqueue.close()queue.join_thread()p.join()import multiprocessing
import timeclass Consumer(multiprocessing.Process):def __init__(self, task_queue, result_queue):multiprocessing.Process.__init__(self)self.task_queue = task_queueself.result_queue = result_queuedef run(self):proc_name = self.namewhile True:next_task = self.task_queue.get()if next_task is None:# Poison pill means shutdownprint '%s: Exiting' % proc_nameself.task_queue.task_done()breakprint '%s: %s' % (proc_name, next_task)answer = next_task()self.task_queue.task_done()self.result_queue.put(answer)returnclass Task(object):def __init__(self, a, b):self.a = aself.b = bdef __call__(self):time.sleep(0.1) # pretend to take some time to do the workreturn '%s * %s = %s' % (self.a, self.b, self.a * self.b)def __str__(self):return '%s * %s' % (self.a, self.b)if __name__ == '__main__':# Establish communication queuestasks = multiprocessing.JoinableQueue()results = multiprocessing.Queue()# Start consumersnum_consumers = multiprocessing.cpu_count() * 2print 'Creating %d consumers' % num_consumersconsumers = [ Consumer(tasks, results)for i in xrange(num_consumers) ]for w in consumers:w.start()# Enqueue jobsnum_jobs = 10for i in xrange(num_jobs):tasks.put(Task(i, i))# Add a poison pill for each consumerfor i in xrange(num_consumers):tasks.put(None)# Wait for all of the tasks to finishtasks.join()# Start printing resultswhile num_jobs:result = results.get()print 'Result:', resultnum_jobs -= 1

进程间信号传递

Event 提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

import multiprocessing
import timedef wait_for_event(e):"""Wait for the event to be set before doing anything"""print('wait_for_event: starting')e.wait()print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print('wait_for_event_timeout: starting')e.wait(t)print('wait_for_event_timeout: e.is_set()->', e.is_set())if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w1.start()w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))w2.start()print('main: waiting before calling Event.set()')time.sleep(3)e.set()print('main: event is set')

Python 多进程 multiprocessing.Pool类详解

multiprocessing 模块

multiprocessing 包是 Python 中的 多进程 管理包。它与 threading.Thread 类似,可以利用multiprocessing.Process 对象来创建一个进程。该进程可以允许放在 Python程序内部编写的函数中。该 Process对象与Thread对象的用法相同,拥有 is_alive()、join([timeout])、run()、start()、terminate() 等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外 multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与 threading 包中的同名类一样。multiprocessing 的很大一部份与 threading 使用同一套 API,只不过换到了多进程的情境。

这个模块表示像线程一样管理进程,这个是 multiprocessing 的核心,它与 threading 很相似,对多核 CPU 的利用率会比 threading 好的多。

看一下 Process 类的构造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:

  • group:进程所属组。基本不用
  • target:表示调用对象。
  • args:表示调用对象的位置参数元组。
  • name:别名
  • kwargs:表示调用对象的字典。

创建进程的简单实例:

# -*- coding: utf-8 -*-import multiprocessingdef do(n):# 获取当前线程的名字name = multiprocessing.current_process().nameprint(name, 'starting')print("worker ", n)returnif __name__ == '__main__':numList = []for i in range(5):p = multiprocessing.Process(target=do, args=(i,))numList.append(p)p.start()p.join()print("Process end.")

执行结果:

Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

创建子进程时,只需要传入一个 执行函数函数的参数,然后用 start() 方法启动。
join() 方法表示等待子进程结束以后再继续往下运行,通常用于进程间的同步。

注意:在 Windows 上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的 if __name__ == ‘__main__’ :语句的下面,才能正常使用 Windows 下的进程模块。Unix/Linux下则不需要。

Pool 类 ( 进程池 )

在使用 Python 进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用 Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时 进程池 就派上用场了。进程池 (Process Pool) 可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。比如下面的程序:

import multiprocessing as muldef func_test(x):return x ** 2if __name__ == '__main__':pool = mul.Pool(5)rel = pool.map(func_test, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])print(rel)pass

Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

Pool 类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。我们可以用 Pool 类创建一个进程池,展开提交的任务给进程池。

一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持 超时 和 回调的异步结果,有一个类似 map 的实现。

参数

  • processes :进程的数量,如果 processes 是 None 那么使用 os.cpu_count() 返回的数量。
  • initializer:如果是 None,那么每一个工作进程在开始的时候会调 initializer(*initargs)。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是 None,意味着只要 Pool 存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者 一个context 对象的 Pool() 方法来创建一个池,两种方法都适当的设置了context

注意:Pool 对象的方法只可以被创建 pool 的进程所调用。

下面介绍一下 multiprocessing 模块下的 Pool 类下的几个方法

进程池的方法 

  • apply(func[, args[, kwds]]) :调用 func 函数并传递 args 和 kwds,结果返回前会一直阻塞,由于这个原因,apply_async() 更适合并发执行,另外,func函数仅被 pool 中的一个进程运行。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply() 方法的一个变体,会返回一个结果对象。 如果 callback 被指定,那么 callback 可以接收一个参数然后被调用,当结果准备好回调时会调用 callback,调用失败时,则用 error_callback 替换 callback。 Callbacks 应被立即完成,否则处理结果的线程会被阻塞。
  • close() :阻止更多的任务提交到 pool,待任务完成后,工作进程会退出。
  • terminate() :不管任务是否完成,立即停止工作进程。在对 pool 对象进程垃圾回收的时候,会立即调用 terminate()。
  • join():wait 工作线程的退出,在调用 join() 前,必须调用 close() 或者 terminate()。这样是因为被终止的进程需要被父进程调用 wait(join等价与wait),否则进程会成为僵尸进程。
  • map(func, iterable[, chunksize])
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])
  • starmap(func, iterable[, chunksize])
  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

apply()

函数原型:apply(func[, args=()[, kwds={}]])    该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且 3.x 以后不在出现)。apply 方法 示例:

import time
from multiprocessing import Pooldef f1(arg):time.sleep(0.5)print(arg)return arg + 100if __name__ == "__main__":pool = Pool(5)for i in range(1, 10):pool.apply(func=f1, args=(i,))pass

apply_async()

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

与 apply 用法一样,但它是非阻塞且支持结果返回进行回调。apply_async 方法 示例:

import time
from multiprocessing import Pooldef f1(i):time.sleep(1)print(i)return i + 100def f2(arg):print(arg)if __name__ == "__main__":pool = Pool(5)for i in range(1, 10):pool.apply_async(func=f1, args=(i,), callback=f2)print('主进程等待')pool.close()pool.join()

  

map()

函数原型:map(func, iterable[, chunksize=None])

Pool 类中的 map 方法,与内置的 map 函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

import time
from multiprocessing import Pooldef run(arg=None):time.sleep(1)print('arg * arg')return arg * argif __name__ == "__main__":temp_list = [1, 2, 3, 4, 5, 6]start_time = time.time()for item in temp_list:run(item)end_time = time.time()print("顺序执行时间:", int(end_time - start_time))pool = Pool(5)  # 创建拥有5个进程数量的进程池start_time = time.time()result = pool.map(run, temp_list)  # 使进程阻塞直到返回结果pool.close()  # 关闭进程池,不再接受新的进程pool.join()   # 主进程阻塞等待子进程的退出end_time = time.time()print("并行执行时间:", int(end_time - start_time))print(f'map 的所有子进程返回的结果列表: {result}')

上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。程序中的 result 表示全部进程执行结束后全部的返回结果集run 函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。

对 Pool对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),让其不再接受新的 Process。

结果中为什么还有 空行 和没有 换行 的数据呢?其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。

示例:

import time
from multiprocessing import Pooldef run(arg=None):time.sleep(2)print(arg)if __name__ == "__main__":startTime = time.time()temp_list = [1, 2, 3, 4, 5]pool = Pool(10)  # 可以同时跑10个进程pool.map(run, temp_list)pool.close()pool.join()endTime = time.time()print("time :", endTime - startTime)

close()

关闭进程池(pool),使其不在接受新的任务。

terminate()

结束工作进程,不在处理未处理的任务。

join()

主进程阻塞等待子进程的退出,join 方法必须在 close 或 terminate 之后使用。

threading 和 multiprocessing

(请尽量先阅读 Python多线程与同步 )

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing 提供了 threading 包中没有的 IPC ( 比如:Pipe 和 Queue ),效率上更高。应优先考虑 Pipe 和 Queue,避免使用 Lock/Event/Semaphore/Condition 等同步方式 (因为它们占据的不是用户进程的资源 )。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如 使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过 共享内存 和 Manager 的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

进程 同步

我们可以从下面的程序中看到 Thread 对象和 Process对象 在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用 Lock 同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

import os
import threading
import multiprocessing# worker function
def worker(sign=None, t_lock=None):t_lock.acquire()print(sign, os.getpid())t_lock.release()# Main
print('Main:', os.getpid())# Multi-thread
record = []
threading_lock = threading.Lock()
for i in range(5):thread = threading.Thread(target=worker, args=('thread', threading_lock))thread.start()record.append(thread)for thread in record:thread.join()# Multi-process
record = []
process_lock = multiprocessing.Lock()
for i in range(5):process = multiprocessing.Process(target=worker, args=('process', process_lock))process.start()record.append(process)for process in record:process.join()

所有 Thread 的 PID 都与主程序相同,而每个 Process 都有一个不同的 PID。

Pipe ( 管道 ) 和  mutiprocessing.Queue( 队列 )

正如我们在 Linux多线程 中介绍的管道PIPE和消息队列 message queue,multiprocessing 包中有Pipe类 和 Queue类 来分别支持这两种 IPC 机制。Pipe 和 Queue 可以用来传送常见的对象。

Pipe 可以是单向(half-duplex),也可以是双向(duplex)。

通过mutiprocessing.Pipe(duplex=False) 创建单向管道 (默认为双向)。一个进程从 PIPE 一端输入对象,然后被 PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。下面的程序展示了 Pipe 的使用:( 这里的 Pipe 是双向的。 )

import multiprocessing as muldef proc1(pipe=None):pipe.send('hello')print('proc1 rec:', pipe.recv())def proc2(pipe=None):print('proc2 rec:', pipe.recv())pipe.send('hello, too')# Build a pipe
pipe = mul.Pipe()# Pass an end of the pipe to process 1
p1 = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()

Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表 Pipe 的一端(Connection对象)。对 Pipe 的某一端调用 send() 方法来传送对象,在另一端使用 recv() 来接收。

mutiprocessing.Queue

Queue 与 Pipe 相类似,都是先进先出的结构。但 Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用 mutiprocessing.Queue(maxsize) 创建,maxsize 表示队列中可以存放对象的最大数量。下面的程序展示了 Queue 的使用:

import os
import multiprocessing
import time# input worker
def input_queue(queue=None):info = str(os.getpid()) + '(put):' + str(time.time())queue.put(info)# output worker
def output_queue(queue=None, lock):info = queue.get()lock.acquire()print(str(os.getpid()) + '(get):' + info)lock.release()# ===================
# Main
record1 = []  # store input processes
record2 = []  # store output processes
lock = multiprocessing.Lock()  # To prevent messy print
queue = multiprocessing.Queue(3)# input processes
for i in range(10):process = multiprocessing.Process(target=input_queue, args=(queue,))process.start()record1.append(process)# output processes
for i in range(10):process = multiprocessing.Process(target=output_queue, args=(queue, lock))process.start()record2.append(process)for p in record1:p.join()queue.close()  # No more object will come, close the queuefor p in record2:p.join()

一些进程使用 put() 在 Queue 中放入字符串,这个字符串中包含 PID 和时间。另一些进程从Queue 中取出,并打印自己的 PID 以及 get() 的字符串

共享资源

在Python多进程初步已经提到,我们应该尽量避免多进程共享资源。多进程共享资源必然会带来进程间相互竞争。而这种竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。但如果需要,我们依然可以通过 共享内存Manager对象 这么做。

共享内存

在 Linux进程间通信 中,已经说过共享内存(shared memory) 的原理,这里给出用 Python 实现的例子:

import multiprocessingdef f(n, a):n.value = 3.14a[0] = 5num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()print(num.value)
print(arr[:])

这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

Manager

Manager 对象类似于 服务器 与 客户 之间的通信 (server-client),与我们在 Internet 上的活动很类似。我们用一个进程作为服务器,建立 Manager 来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。在防火墙允许的情况下,我们完全可以将Manager运用于多计算机,从而模仿了一个真实的网络情境。下面的例子中,我们对Manager的使用类似于shared memory,但可以共享更丰富的对象类型。

import multiprocessingdef f(x, arr, l):x.value = 3.14arr[0] = 5l.append('Hello')server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()proc = multiprocessing.Process(target=f, args=(x, arr, l))
proc.start()
proc.join()print(x.value)
print(arr)
print(l)

Manager 利用 list() 方法提供了表的共享方式。实际上你可以利用 dict() 来共享词典,Lock() 来共享 threading.Lock ( 注意,我们共享的是 threading.Lock,而不是进程的 mutiprocessing.Lock。后者本身已经实现了进程共享) 等。 这样 Manager 就允许我们共享更多样的对象。

  • Python多进程通信Queue、Pipe、Value、Array实例
  • Python中使用Queue和Condition进行线程同步的方法
  • Python Queue模块详解
  • python基于queue和threading实现多线程下载实例
  • 浅析Python中的多进程与多线程的使用
  • Python多进程同步Lock、Semaphore、Event实例
  • python 多进程通信模块的简单实现
  • 探究Python多进程编程下线程之间变量的共享问题
  • python多进程操作实例
  • 简单谈谈python中的Queue与多进程

Python 多进程 multiprocessing 使用示例相关推荐

  1. python PHP 多进程,python多进程的用法示例(代码)

    本篇文章给大家带来的内容是关于python多进程的用法示例(代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. python多线程适合IO密集型场景,而在CPU密集型场景,并不能充 ...

  2. python 多进程multiprocessing进程池pool tensorflow-yolov3 报错 MemoryError

    进程数设置为1-9个都能正常运行,设置成10个就开始报错,怪事! D:\20191031_tensorflow_yolov3\python\python.exe D:/20191031_tensorf ...

  3. python 多进程 multiprocessing.Queue()报错:The freeze_support() line can be omitted if the program

    运行以下多进程测试代码时报错: # -*- coding: utf-8 -*- """ @File : test_191205_测试多进程Multiprocessing_ ...

  4. python 多进程multiprocessing 如何获取子进程的返回值?进程池pool,apply_async(),get(),

    案例1 # -*- coding: utf-8 -*- """ @File : 20200318_摄像头多进程流传输.py @Time : 2020/3/18 14:58 ...

  5. python 多进程multiprocessing 队列queue报错:AttributeError: Can't pickle local object

    今天,test-191204-单个摄像头调用multiprocessing线程队列queue识别时,报错: D:\20191031_tensorflow_yolov3\python\python.ex ...

  6. python 多进程multiprocessing进程池pool tensorflow-yolov3 报错TypeError: 'ApplyResult' object is not iterable

    首先,代码结构它长这样: 可每次调用线程池进行识别时,就会报如下错误: D:\20191031_tensorflow_yolov3\python\python.exe D:/20191031_tens ...

  7. python multithreading_操作系统OS,Python - 多进程(multiprocessing)、多线程(multithreading)...

    多进程(multiprocessing) 参考: 1. 多进程概念 multiprocessing is a package that supports spawning processes usin ...

  8. 二十四、深入Python多进程multiprocessing模块

    @Author:Runsen multiprocessing multiprocessing包是Python中的多进程管理包.与之前的threading.Thread类似,它可以利用multiproc ...

  9. Python 多进程 multiprocessing.Pool类详解

    multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对 ...

最新文章

  1. python怎么使用训练好的模型设计_tensorflow训练好的模型怎么调用?
  2. 实例方法-扩展器-生命zhou
  3. java 环境配置 maven 环境配置
  4. springboot启动不了_七款高Star的开源SpringBoot扩展,助你的代码水平更上一层楼
  5. 【图像超分辨率】基于ResNet或GAN的遥感图像超分辨率论文
  6. YBTOJ洛谷P1407:稳定婚姻(强连通分量)
  7. python怎么运行丘比特之箭_test
  8. java的remove iterator_Java集合 iterator.remove()方法详解
  9. ROS 可视化(一): 发布PointCloud2点云数据到Rviz
  10. HDU - 6191 Query on A Tree
  11. 设置crontab用vi打开编辑
  12. 完全卸载Oracle11
  13. vfp报表打印到PDF文件中不用输入文件名
  14. [分层最短路板子] 洛谷 P4568
  15. python程序自动运行_定时后台运行Python程序
  16. 函数的使用:两个数取最小值
  17. STL容器底层数据结构
  18. SOC设计之AMBA总线-AHB总线详解
  19. plcst语言编程教程_PLC ST语言编程之我的心得-专业自动化论坛-中国工控网论坛...
  20. 乐视网:截至2月已逾期金融机构借款类债务20.84亿元

热门文章

  1. 会议交流 | IJCKG 2021 日程表(北京时间)
  2. Android官方开发文档Training系列课程中文版:通过NFC共享文件之从其它设备接收文件
  3. TypeError: can't pickle _thread.RLock objects
  4. 企业咨询:常用分析和咨询方法列表
  5. HDU Integer's Power(容斥原理)
  6. hessian学习笔记
  7. js `` 手机不支持
  8. 50 days before NOI2017
  9. git Please move or remove them before you can merge. 错误解决方案
  10. 计算机网络学习笔记-目录(更新日期:2020.4.8)