python3 多进程 同步
目录
进程同步之:Queue和JoinableQueue
代码示例1:Queue
代码示例2:两个子进程间通信
代码示例3:子进程间通信Queue&JoinableQueue
进程同步之:Lock
进程同步之:event
进程同步之:pipe(管道)
进程同步之:Condition
进程同步之:共享变量(数字/字符串/列表/字典/实例对象)
代码示例1:未使用共享变量
代码示例2:共享数字变量
代码示例3:共享变量+锁
代码示例4:共享字符串
代码示例5:共享的字典和列表
代码示例6:共享实例对象
进程同步之:Queue和JoinableQueue
multiprocessing.Queue类似于queue.Queue,一般用来多个进程间交互信息。Queue是进程和线程安全的。它实现了queue.Queue的大部分方法,但task_done()和join()没有实现。
multiprocessing.JoinableQueue是multiprocessing.Queue的子类,增加了task_done()方法和join()方法。
task_done():一般在调用get()时获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join():阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
代码示例1:Queue
#encoding=utf-8
from multiprocessing import Process, Queue def offer(queue): # 入队列if queue.empty():queue.put("Hello World") else:print(queue.get())if __name__ == '__main__': # 创建一个队列实例q = Queue() p = Process(target = offer, args = (q,)) p.start() print(q.get()) # 主进程执行出队列操作q.put("哈哈哈哈哈")# 主进程执行入队列操作m = Process(target = offer, args = (q,)) # 子进程执行入队列操作m.start()m.join()p.join()
代码示例2:两个子进程间通信
#encoding=utf-8
from multiprocessing import Process, Queue
import os, time, random # 写数据进程执行的代码:
def write(q): for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码
def read(q):time.sleep(1)while not q.empty():# if not q.empty():print('Get %s from queue.' % q.get(True))time.sleep(1) # 目的是等待写队列完成if __name__=='__main__': # 父进程创建Queue,并传给各个子进程q = Queue()pw = Process(target = write, args = (q,)) pr = Process(target = read, args = (q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() pr.join()
print("Done!")
代码示例3:子进程间通信Queue&JoinableQueue
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):# 派生进程def __init__(self, task_queue, result_queue):multiprocessing.Process.__init__(self)self.task_queue = task_queue#任务队列self.result_queue = result_queue#结果队列# 重写原进程的run方法def run(self):proc_name = self.namewhile True:next_task = self.task_queue.get()if next_task is None:#为None,进程退出# Poison pill means shutdownprint(('%s: Exiting' % proc_name))self.task_queue.task_done()breakprint(('%s: %s' % (proc_name, next_task)))answer = next_task() # __call__()self.task_queue.task_done()#执行完get之后调用task_done()来通知Queue当前task完成self.result_queue.put(answer)returnclass Task(object):def __init__(self, a, b):self.a = aself.b = bdef __call__(self):time.sleep(0.1) # pretend to take some time to do the workreturn '%s * %s = %s' % (self.a, self.b, self.a * self.b)def __str__(self):return '%s * %s' % (self.a, self.b)if __name__ == '__main__':# Establish communication queuestasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue()# Start consumersnum_consumers = multiprocessing.cpu_count()print(('Creating %d consumers' % num_consumers))# 创建cup核数量个的子进程consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]# 依次启动子进程for w in consumers:w.start()# Enqueue jobsnum_jobs = 10for i in range(num_jobs):tasks.put(Task(i, i))for i in range(num_consumers):tasks.put(None)#有几个进程加几个None,以此结束死循环# Wait for all of the tasks to finishtasks.join()# Start printing resultswhile num_jobs:result = results.get()print ('Result: %s' %result)num_jobs -= 1
思路:
类consumer :进程对象,定义了进程怎么运行任务,怎么记录结果
类Task:定义了任务是什么:a*b=c
主程序:
声明2个队列:
1个是任务队列,类型:joinalbe queue(每get一个任务
执行完任务之后,都要调用一次self.task_queue.task_done()
表示这个任务完成)
2 结果队列,类型:Queue,存储任务的计算结果(10个数)
3 获取你机器的cpu个数
4 实例化10个Task类的实例,作为任务,放到任务队列
5 启动cpu个数的进程,依次start启动
6 多进程开始干这10个任务,每个任务的计算结果计入结果队列
7 任务队列名.join()这句话等待所有任务执行完毕
8 遍历结果队列,打印所有的结果
进程同步之:Lock
>#encoding=utf-8
from multiprocessing import Process, Lock
import time
def l(num,lock): lock.acquire() # 获得锁time.sleep(0.2)print("Hello Num: %s" % (num)) lock.release() # 释放锁if __name__ == '__main__': lock = Lock() # 创建一个共享锁实例for num in range(10): Process(target = l, args = (num,lock)).start()
#encoding=utf-8
import multiprocessing
import time
def worker(s, i):s.acquire()print(multiprocessing.current_process().name + " acquire")time.sleep(i)print(multiprocessing.current_process().name + " release")s.release()if __name__ == "__main__":# 设置限制最多3个进程同时访问共享资源s = multiprocessing.Semaphore(3)for i in range(5):p = multiprocessing.Process(target = worker, args = (s, i * 2))p.start()
进程同步之:event
Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事
件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
#encoding=utf-8
import multiprocessing
import timedef wait_for_event(e):"""Wait for the event to be set before doing anything"""print('wait_for_event: starting')e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print('wait_for_event_timeout: starting')e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行print('wait_for_event_timeout: e.is_set()->', e.is_set())e.set()# 初始内部标志为真if __name__ == '__main__':e = multiprocessing.Event()print("begin,e.is_set()", e.is_set())w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w1.start()w2 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w2.start()#可将2改为5,看看执行结果w3 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w3.start()print('main: waiting before calling Event.set()')time.sleep(3)# e.set() #可注释此句话看效果
print('main: event is set')
进程同步之:pipe(管道)
#encoding=utf-8
import multiprocessing as mpdef proc_1(pipe):pipe.send('hello')print('proc_1 received: %s' %pipe.recv())#pipe.recv()会死等pipe.send("what is your name?")print('proc_1 received: %s' %pipe.recv())
def proc_2(pipe):print('proc_2 received: %s' %pipe.recv())#pipe.recv()会死等pipe.send('hello, too')print('proc_2 received: %s' %pipe.recv())pipe.send("I don't tell you!")if __name__ == '__main__':# 创建一个管道对象pipepipe = mp.Pipe()print(len(pipe))print(type(pipe))# 将第一个pipe对象传给进程1p1 = mp.Process(target = proc_1, args = (pipe[0], ))# 将第二个pipe对象传给进程2p2 = mp.Process(target = proc_2, args = (pipe[1], ))p2.start()p1.start()p2.join()p1.join()
进程同步之:Condition
#encoding=utf-8
import multiprocessing as mp
import threading
import time
def consumer(cond):with cond:print("consumer before wait")cond.wait() # 等待消费print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notify_all() # 通知消费者可以消费了
## cond.notify() # 通知1个人,没有参数print("producer after notifyAll")if __name__ == '__main__': condition = mp.Condition()p1 = mp.Process(name = "p1", target = consumer, args=(condition,))p2 = mp.Process(name = "p2", target = consumer, args=(condition,))p3 = mp.Process(name = "p3", target = producer, args=(condition,))p1.start()time.sleep(2)p2.start()time.sleep(2)p3.start()
进程同步之:共享变量(数字/字符串/列表/字典/实例对象)
Array类:
构造方法:Array(typecode_or_type, size_or_initializer, *, lock=True)
函数作用:返回从共享内存中分配的一个数据内容为ctypes 类型的数组,ctypes 类型参见上表
参数说明:size_or_initializer可以是一个初始化好的列表也可以是列表的大小,如果是列表大小的话,默认值和c中对应的数据类型是一致的。Manager类:
Manager对象的使用方法同multiprocessing中的类似,共享的字符串只能用manager.Value()来实现,因为multiprocessing.Value()的参数不支持字符串
代码示例1:未使用共享变量
#encoding=utf-8
from multiprocessing import Process
def f(n, a):n = 3.1415927 for i in range(len(a)):a[i] = -a[i]#print(a[i])
if __name__ == '__main__':num = 0 # arr = list(range(10))p = Process(target = f, args = (num, arr))p.start()p.join()print(num) print(arr[:])
代码示例2:共享数字变量
#encoding=utf-8
from multiprocessing import Process, Value, Arraydef f(n, a):n.value = n.value+1for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0
arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
#arr = Array('i', 10) # 创建一个进程间共享的数组类型,初始大小为10p = Process(target = f, args = (num, arr))p.start()p.join()p = Process(target = f, args = (num, arr))p.start()p.join()print(num.value) # 获取共享变量num的值print(arr[:])
代码示例3:共享变量+锁
#encoding=utf-8
import time
from multiprocessing import Process, Value, Lockclass Counter(object):def __init__(self, initval = 0):self.val = Value('i', initval)self.lock = Lock()def increment(self):with self.lock:self.val.value += 1 # 共享变量自加1#print(“increment one time!”,self.value() ) #加此句死锁def value(self):with self.lock:return self.val.valuedef func(counter):for i in range(50):time.sleep(0.01)counter.increment()if __name__ == '__main__':counter = Counter(0)procs = [Process(target = func, args = (counter,)) for i in range(10)]# 等价于# for i in range(10):# Process(target = func, args = (counter,))for p in procs: p.start()for p in procs: p.join()print(counter.value())
代码示例4:共享字符串
#encoding=utf-8
from multiprocessing import Process, Manager, Value
from ctypes import c_char_pdef greet(shareStr):shareStr.value = shareStr.value + ", World!"if __name__ == '__main__':manager = Manager()shareStr = manager.Value(c_char_p, "Hello") process = Process(target = greet, args = (shareStr,))process.start()process.join()
print(shareStr.value)
代码示例5:共享的字典和列表
#encoding=utf-8
from multiprocessing import Process, Managerdef f( shareDict, shareList ):shareDict[1] = '1'shareDict['2'] = 2shareDict[0.25] = NoneshareList.reverse() # 翻转列表if __name__ == '__main__':manager = Manager()shareDict = manager.dict() # 创建共享的字典类型shareList = manager.list( range( 10 ) ) # 创建共享的列表类型p = Process( target = f, args = ( shareDict, shareList ) )p.start()p.join()print(shareDict)print(shareList)
代码示例6:共享实例对象
#encoding=utf-8
import time, os
import random
from multiprocessing import Pool, Value, Lock, Manager
from multiprocessing.managers import BaseManager#必须要创建一个BaseManager类的子类
class MyManager(BaseManager): passdef Manager():m = MyManager()m.start()return mclass Counter(object):def __init__(self, initval=0):self.val = Value('i', initval)self.lock = Lock()def increment(self):with self.lock:self.val.value += 1def value(self):with self.lock:return self.val.value
#将Counter类注册到Manager管理类中
MyManager.register('Counter', Counter)def long_time_task(name,counter):time.sleep(0.2)print('Run task %s (%s)...\n' % (name, os.getpid()))start = time.time()#time.sleep(random.random() * 3)for i in range(50):time.sleep(0.01)counter.increment()end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__':manager = Manager()#返回一个管理对象# 创建共享Counter类实例对象的变量,Counter类的初始值0counter = manager.Counter(0) print('Parent process %s.' % os.getpid())p = Pool()for i in range(5):p.apply_async(long_time_task, args = (str(i), counter))print('Waiting for all subprocesses done...')p.close()p.join()print('All subprocesses done.')print(counter.value())
说明:
Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。Manager()返回的管理者,支持类型包括,list, dict, Namespace, Lock,RLock, Semaphore, BoundedSemaphore,Condition,Event, Queue, Value and Array。
managers比使用共享内存对象更灵活,因为它支持任意对象类型。同样的,单个的manager可以通过网络在不同机器上进程间共享。但是,会比共享内存慢。
python3 多进程 同步相关推荐
- 40.多进程同步--锁--多把锁
多进程同步 首先多进程默认是并发行为,多个进程同时执行 执行的顺序,以及何时执行完毕无法控制 多个进程如果涉及到了通信,数据的有序性无法保证 需要锁来控制进程之间执行的顺序 对于进程资源的控制 缺点: ...
- 橘子苹果问题代码linux,多进程同步橘子苹果问题完整报告(附源代码).doc
多进程同步橘子苹果问题完整报告(附源代码).doc 一. 课程设计目的 本次实验进行操作系统课程设计的主要任务是模拟生产者和消费者的一个衍生, 即实现"橘子-苹果问题" .这个问题 ...
- 基于Python3多进程(多线程)+多协程的数据并发处理模版
图片来源:elenabsl/Shutterstock 上一篇文章<基于Python3单进程+多线程+多协程的生产者-消费者模型示例代码>介绍了如何使用Python在单进程的情况下利用协程并 ...
- python3 多进程 multiprocessing 报错 AttributeError: ‘ForkAwareLocal‘ object has no attribute ‘connection‘
目录 错误代码 报错信息 错误原因 解决方法 错误代码 python多进程管理manager时候,当不使用join对当前进程(主进程)进行阻塞时会报错,具体代码及错误如下: from multipro ...
- python3 多进程库 multiprocessing 使用简介
引子 前段时间在做的一个Python项目,需要实现一个后台服务程序,程序流程比较复杂,而且可能经常变动,但是如果把整个流程切分成一些步骤,每个步骤有自己的输入输出和处理.只要将他们的输入输出接在一起, ...
- python3 多进程共享变量实现方法
今天同事反映一个问题让帮忙看一下:多进程共用一个变量,在一个进程中修改后,在另外的进程中并没有产生修改. 一.错误的实现方式 最初以为是没添加global声明导致修改未生效,但实际操作发现global ...
- python3多进程 pool manager_Python多进程multiprocessing.Pool
Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求:但如果池中的进程数已经达到规定最大值, ...
- linux 多线程 多进程同步
多线程 同步的方法 1. 临界区 2. 互斥量(注意mutex只能用于线程的互斥,不能用于进程) 3. 信号量 4. 事件 多进程 同步方法 管道(Pipe)及有名管道(named pipe):管道可 ...
- python3多进程 进程池 协程并发
一.进程 我们电脑的应用程序,都是进程,进程是资源分配的单位.进程切换需要的资源最大,效率低. 进程之间相互独立 cpu密集的时候适合用多进程 #多进程并发 import multiprocessin ...
最新文章
- RDKit |基于集成学习(Ensemble learning)预测溶解度
- Understand Event-Driven Software Architecture
- 2021-05-21 深入理解SLAM技术 【4】射影几何--2面中心射影
- java 线程 获取消息_获取java线程中信息
- 常见的几种内排序算法以及实现(C语言)(转)
- Flex Module优化
- 7z替换exe文件内容不能替换文件_Windows小技巧 批处理文件实现目录下文件批量打包压缩...
- python 笔记 之 装饰器
- DevExpress WPF v19.1新版亮点:PDF Viewer等控件新功能
- httpget和ajax,javascript - HTTP中的get、post请求和ajax的get、post请求是一个东西吗?...
- 卸载JLink驱动弹出“could not open INSTALL.LOG file”的解决方法
- 广西南宁机器人比赛_广西南宁中小学生机器人竞赛精彩纷呈
- 计算机系统结构 外文,计算机系统结构外文文献 计算机系统结构参考文献怎么写...
- 微信小程序:上传的图片显示旋转问题
- 三角公式以及常见关系
- 统一告警平台设计方案
- Java BIO的基本介绍
- 线性代数---魏福义版 第二章习题答案
- excel高效之删除空行
- 如何生成和使用CLIPS动态链接库