目录

进程同步之:Queue和JoinableQueue

代码示例1:Queue

代码示例2:两个子进程间通信

代码示例3:子进程间通信Queue&JoinableQueue

进程同步之:Lock

进程同步之:event

进程同步之:pipe(管道)

进程同步之:Condition

进程同步之:共享变量(数字/字符串/列表/字典/实例对象)

代码示例1:未使用共享变量

代码示例2:共享数字变量

代码示例3:共享变量+锁

代码示例4:共享字符串

代码示例5:共享的字典和列表

代码示例6:共享实例对象


进程同步之:Queue和JoinableQueue

multiprocessing.Queue类似于queue.Queue,一般用来多个进程间交互信息。Queue是进程和线程安全的。它实现了queue.Queue的大部分方法,但task_done()和join()没有实现。
multiprocessing.JoinableQueue是multiprocessing.Queue的子类,增加了task_done()方法和join()方法。
task_done():一般在调用get()时获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join():阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

代码示例1:Queue

#encoding=utf-8
from multiprocessing import Process, Queue  def offer(queue):  # 入队列if queue.empty():queue.put("Hello World")  else:print(queue.get())if __name__ == '__main__':  # 创建一个队列实例q = Queue() p = Process(target = offer, args = (q,))  p.start()  print(q.get()) # 主进程执行出队列操作q.put("哈哈哈哈哈")# 主进程执行入队列操作m = Process(target = offer, args = (q,)) # 子进程执行入队列操作m.start()m.join()p.join()

代码示例2:两个子进程间通信

#encoding=utf-8
from multiprocessing import Process, Queue
import os, time, random # 写数据进程执行的代码:
def write(q): for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码
def read(q):time.sleep(1)while not q.empty():# if not q.empty():print('Get %s from queue.' % q.get(True))time.sleep(1) # 目的是等待写队列完成if __name__=='__main__': # 父进程创建Queue,并传给各个子进程q = Queue()pw = Process(target = write, args = (q,)) pr = Process(target = read, args = (q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() pr.join()
print("Done!")

代码示例3:子进程间通信Queue&JoinableQueue

#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):# 派生进程def __init__(self, task_queue, result_queue):multiprocessing.Process.__init__(self)self.task_queue = task_queue#任务队列self.result_queue = result_queue#结果队列# 重写原进程的run方法def run(self):proc_name = self.namewhile True:next_task = self.task_queue.get()if next_task is None:#为None,进程退出# Poison pill means shutdownprint(('%s: Exiting' % proc_name))self.task_queue.task_done()breakprint(('%s: %s' % (proc_name, next_task)))answer = next_task() # __call__()self.task_queue.task_done()#执行完get之后调用task_done()来通知Queue当前task完成self.result_queue.put(answer)returnclass Task(object):def __init__(self, a, b):self.a = aself.b = bdef __call__(self):time.sleep(0.1) # pretend to take some time to do the workreturn '%s * %s = %s' % (self.a, self.b, self.a * self.b)def __str__(self):return '%s * %s' % (self.a, self.b)if __name__ == '__main__':# Establish communication queuestasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue()# Start consumersnum_consumers = multiprocessing.cpu_count()print(('Creating %d consumers' % num_consumers))# 创建cup核数量个的子进程consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]# 依次启动子进程for w in consumers:w.start()# Enqueue jobsnum_jobs = 10for i in range(num_jobs):tasks.put(Task(i, i))for i in range(num_consumers):tasks.put(None)#有几个进程加几个None,以此结束死循环# Wait for all of the tasks to finishtasks.join()# Start printing resultswhile num_jobs:result = results.get()print ('Result: %s' %result)num_jobs -= 1


思路
类consumer :进程对象,定义了进程怎么运行任务,怎么记录结果
类Task:定义了任务是什么:a*b=c
主程序:
声明2个队列:
1个是任务队列,类型:joinalbe queue(每get一个任务
执行完任务之后,都要调用一次self.task_queue.task_done()
表示这个任务完成)
2 结果队列,类型:Queue,存储任务的计算结果(10个数)
3 获取你机器的cpu个数
4 实例化10个Task类的实例,作为任务,放到任务队列
5 启动cpu个数的进程,依次start启动
6 多进程开始干这10个任务,每个任务的计算结果计入结果队列
7 任务队列名.join()这句话等待所有任务执行完毕
8 遍历结果队列,打印所有的结果

进程同步之:Lock

锁是为了确保数据一致性。比如读写锁,每个进程给一个变量增加 1 ,但是如果在一个进程读取但还没有写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要加锁来保持数据一致性。

>#encoding=utf-8
from multiprocessing import Process, Lock
import time
def l(num,lock):  lock.acquire() # 获得锁time.sleep(0.2)print("Hello Num: %s" % (num))  lock.release() # 释放锁if __name__ == '__main__':  lock = Lock()  # 创建一个共享锁实例for num in range(10):  Process(target = l, args = (num,lock)).start()

进程同步之:Semaphore(多把锁)
Semaphore用于控制对共享资源的访问数量。Semaphore锁和Lock稍有不同,Semaphore相当于N把锁,获取其中一把就可以执行。可用锁的总数N在创建实例时传入,比如s = Semaphore(n)。与Lock一样,如果可用锁为0,进程将会阻塞,直到可用锁大于0。

#encoding=utf-8
import multiprocessing
import time
def worker(s, i):s.acquire()print(multiprocessing.current_process().name + " acquire")time.sleep(i)print(multiprocessing.current_process().name + " release")s.release()if __name__ == "__main__":# 设置限制最多3个进程同时访问共享资源s = multiprocessing.Semaphore(3)for i in range(5):p = multiprocessing.Process(target = worker, args = (s, i * 2))p.start()


最多只能有3个进程同时获得锁

进程同步之:event

Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事
件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

#encoding=utf-8
import multiprocessing
import timedef wait_for_event(e):"""Wait for the event to be set before doing anything"""print('wait_for_event: starting')e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print('wait_for_event_timeout: starting')e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行print('wait_for_event_timeout: e.is_set()->', e.is_set())e.set()# 初始内部标志为真if __name__ == '__main__':e = multiprocessing.Event()print("begin,e.is_set()", e.is_set())w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w1.start()w2 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w2.start()#可将2改为5,看看执行结果w3 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w3.start()print('main: waiting before calling Event.set()')time.sleep(3)# e.set()   #可注释此句话看效果
print('main: event is set')


两个等待的进程都继续执行

进程同步之:pipe(管道)

Pipe是两个进程间通信的工具。Pipe可以是单向(half-duplex),也可以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从Pipe一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
Pipe的每个端口同时最多一个进程读写,否则会出现各种问题,可能造成corruption异常。Pipe对象建立的时候,返回一个含有两个元素的元组对象,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

#encoding=utf-8
import multiprocessing as mpdef proc_1(pipe):pipe.send('hello')print('proc_1 received: %s' %pipe.recv())#pipe.recv()会死等pipe.send("what is your name?")print('proc_1 received: %s' %pipe.recv())
def proc_2(pipe):print('proc_2 received: %s' %pipe.recv())#pipe.recv()会死等pipe.send('hello, too')print('proc_2 received: %s' %pipe.recv())pipe.send("I don't tell you!")if __name__ == '__main__':# 创建一个管道对象pipepipe = mp.Pipe()print(len(pipe))print(type(pipe))# 将第一个pipe对象传给进程1p1 = mp.Process(target = proc_1, args = (pipe[0], ))# 将第二个pipe对象传给进程2p2 = mp.Process(target = proc_2, args = (pipe[1], ))p2.start()p1.start()p2.join()p1.join()

进程同步之:Condition

一个condition变量总是与某些类型的锁相联系,当几个condition变量必须共享同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和release() 会调用与锁相关联的相应方法。
wait()方法会释放锁,当另外一个进程使用notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,Condition类实现了一个conditon变量。这个condition变量允许一个或多个进程等待,直到他们被另一个进程通知。如果lock参数,被给定一个非空的值,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。
wait(timeout=None) :等待通知,或者等到设定的超时时间。当调用wait()方法时,如果调用它的进程没有得到锁,那么会抛出一个RuntimeError异常。 wait()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。如果有等待的进程,notify()方法会唤醒一个在等待conditon变量的进程。notify_all() 则会唤醒所有在等待conditon变量的进程。
注意:
notify()和notify_all()不会释放锁,也就是说,进程被唤醒后不会立刻返回他们的wait() 调用。除非进程调用notify()和notify_all()之后放弃了锁的所有权。在典型的设计风格里,利用condition变量加锁去允许访问一些共享状态,进程在获取到它想得到的状态前,会反复调用wait()。修改状态的进程在他们状态改变时调用 notify() or notify_all(),用这种方式,进程会尽可能的获取到想要的一个等待者状态。

#encoding=utf-8
import multiprocessing as mp
import threading
import time
def consumer(cond):with cond:print("consumer before wait")cond.wait() # 等待消费print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notify_all() # 通知消费者可以消费了
##    cond.notify() # 通知1个人,没有参数print("producer after notifyAll")if __name__ == '__main__':    condition = mp.Condition()p1 = mp.Process(name = "p1", target = consumer, args=(condition,))p2 = mp.Process(name = "p2", target = consumer, args=(condition,))p3 = mp.Process(name = "p3", target = producer, args=(condition,))p1.start()time.sleep(2)p2.start()time.sleep(2)p3.start()

进程同步之:共享变量(数字/字符串/列表/字典/实例对象)

程序运行中生成的变量是放在进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了Array/Manager/Value类,借助以上类能够实现进程间共享数字变量/字符串变量/列表/字典/实例对象。

*args是传递给ctypes的构造参数

  • Array类:
    构造方法:Array(typecode_or_type, size_or_initializer, *, lock=True)
    函数作用:返回从共享内存中分配的一个数据内容为ctypes 类型的数组,ctypes 类型参见上表
    参数说明:size_or_initializer可以是一个初始化好的列表也可以是列表的大小,如果是列表大小的话,默认值和c中对应的数据类型是一致的。

  • Manager类:
    Manager对象的使用方法同multiprocessing中的类似,共享的字符串只能用manager.Value()来实现,因为multiprocessing.Value()的参数不支持字符串

代码示例1:未使用共享变量

#encoding=utf-8
from multiprocessing import Process
def f(n, a):n = 3.1415927 for i in range(len(a)):a[i] = -a[i]#print(a[i])
if __name__ == '__main__':num = 0 # arr = list(range(10))p = Process(target = f, args = (num, arr))p.start()p.join()print(num) print(arr[:])

代码示例2:共享数字变量

#encoding=utf-8
from multiprocessing import Process, Value, Arraydef f(n, a):n.value = n.value+1for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0
arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
#arr = Array('i', 10) # 创建一个进程间共享的数组类型,初始大小为10p = Process(target = f, args = (num, arr))p.start()p.join()p = Process(target = f, args = (num, arr))p.start()p.join()print(num.value) # 获取共享变量num的值print(arr[:])

代码示例3:共享变量+锁

#encoding=utf-8
import time
from multiprocessing import Process, Value, Lockclass Counter(object):def __init__(self, initval = 0):self.val = Value('i', initval)self.lock = Lock()def increment(self):with self.lock:self.val.value += 1 # 共享变量自加1#print(“increment one time!”,self.value() )  #加此句死锁def value(self):with self.lock:return self.val.valuedef func(counter):for i in range(50):time.sleep(0.01)counter.increment()if __name__ == '__main__':counter = Counter(0)procs = [Process(target = func, args = (counter,)) for i in range(10)]# 等价于# for i in range(10):# Process(target = func, args = (counter,))for p in procs: p.start()for p in procs: p.join()print(counter.value())

代码示例4:共享字符串

#encoding=utf-8
from multiprocessing import Process, Manager, Value
from ctypes import c_char_pdef greet(shareStr):shareStr.value = shareStr.value + ", World!"if __name__ == '__main__':manager = Manager()shareStr = manager.Value(c_char_p, "Hello") process = Process(target = greet, args = (shareStr,))process.start()process.join()
print(shareStr.value)

代码示例5:共享的字典和列表

#encoding=utf-8
from multiprocessing import Process, Managerdef f( shareDict, shareList ):shareDict[1] = '1'shareDict['2'] = 2shareDict[0.25] = NoneshareList.reverse() # 翻转列表if __name__ == '__main__':manager = Manager()shareDict = manager.dict() # 创建共享的字典类型shareList = manager.list( range( 10 ) ) # 创建共享的列表类型p = Process( target = f, args = ( shareDict, shareList ) )p.start()p.join()print(shareDict)print(shareList)

代码示例6:共享实例对象

#encoding=utf-8
import time, os
import random
from multiprocessing import Pool, Value, Lock, Manager
from multiprocessing.managers import BaseManager#必须要创建一个BaseManager类的子类
class MyManager(BaseManager): passdef Manager():m = MyManager()m.start()return mclass Counter(object):def __init__(self, initval=0):self.val = Value('i', initval)self.lock = Lock()def increment(self):with self.lock:self.val.value += 1def value(self):with self.lock:return self.val.value
#将Counter类注册到Manager管理类中
MyManager.register('Counter', Counter)def long_time_task(name,counter):time.sleep(0.2)print('Run task %s (%s)...\n' % (name, os.getpid()))start = time.time()#time.sleep(random.random() * 3)for i in range(50):time.sleep(0.01)counter.increment()end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__':manager = Manager()#返回一个管理对象# 创建共享Counter类实例对象的变量,Counter类的初始值0counter = manager.Counter(0) print('Parent process %s.' % os.getpid())p = Pool()for i in range(5):p.apply_async(long_time_task, args = (str(i), counter))print('Waiting for all subprocesses done...')p.close()p.join()print('All subprocesses done.')print(counter.value())

说明:
Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。Manager()返回的管理者,支持类型包括,list, dict, Namespace, Lock,RLock, Semaphore, BoundedSemaphore,Condition,Event, Queue, Value and Array。
managers比使用共享内存对象更灵活,因为它支持任意对象类型。同样的,单个的manager可以通过网络在不同机器上进程间共享。但是,会比共享内存慢。

python3 多进程 同步相关推荐

  1. 40.多进程同步--锁--多把锁

    多进程同步 首先多进程默认是并发行为,多个进程同时执行 执行的顺序,以及何时执行完毕无法控制 多个进程如果涉及到了通信,数据的有序性无法保证 需要锁来控制进程之间执行的顺序 对于进程资源的控制 缺点: ...

  2. 橘子苹果问题代码linux,多进程同步橘子苹果问题完整报告(附源代码).doc

    多进程同步橘子苹果问题完整报告(附源代码).doc 一. 课程设计目的 本次实验进行操作系统课程设计的主要任务是模拟生产者和消费者的一个衍生, 即实现"橘子-苹果问题" .这个问题 ...

  3. 基于Python3多进程(多线程)+多协程的数据并发处理模版

    图片来源:elenabsl/Shutterstock 上一篇文章<基于Python3单进程+多线程+多协程的生产者-消费者模型示例代码>介绍了如何使用Python在单进程的情况下利用协程并 ...

  4. python3 多进程 multiprocessing 报错 AttributeError: ‘ForkAwareLocal‘ object has no attribute ‘connection‘

    目录 错误代码 报错信息 错误原因 解决方法 错误代码 python多进程管理manager时候,当不使用join对当前进程(主进程)进行阻塞时会报错,具体代码及错误如下: from multipro ...

  5. python3 多进程库 multiprocessing 使用简介

    引子 前段时间在做的一个Python项目,需要实现一个后台服务程序,程序流程比较复杂,而且可能经常变动,但是如果把整个流程切分成一些步骤,每个步骤有自己的输入输出和处理.只要将他们的输入输出接在一起, ...

  6. python3 多进程共享变量实现方法

    今天同事反映一个问题让帮忙看一下:多进程共用一个变量,在一个进程中修改后,在另外的进程中并没有产生修改. 一.错误的实现方式 最初以为是没添加global声明导致修改未生效,但实际操作发现global ...

  7. python3多进程 pool manager_Python多进程multiprocessing.Pool

    Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求:但如果池中的进程数已经达到规定最大值, ...

  8. linux 多线程 多进程同步

    多线程 同步的方法 1. 临界区 2. 互斥量(注意mutex只能用于线程的互斥,不能用于进程) 3. 信号量 4. 事件 多进程 同步方法 管道(Pipe)及有名管道(named pipe):管道可 ...

  9. python3多进程 进程池 协程并发

    一.进程 我们电脑的应用程序,都是进程,进程是资源分配的单位.进程切换需要的资源最大,效率低. 进程之间相互独立 cpu密集的时候适合用多进程 #多进程并发 import multiprocessin ...

最新文章

  1. RDKit |基于集成学习(Ensemble learning)预测溶解度
  2. Understand Event-Driven Software Architecture
  3. 2021-05-21 深入理解SLAM技术 【4】射影几何--2面中心射影
  4. java 线程 获取消息_获取java线程中信息
  5. 常见的几种内排序算法以及实现(C语言)(转)
  6. Flex Module优化
  7. 7z替换exe文件内容不能替换文件_Windows小技巧 批处理文件实现目录下文件批量打包压缩...
  8. python 笔记 之 装饰器
  9. DevExpress WPF v19.1新版亮点:PDF Viewer等控件新功能
  10. httpget和ajax,javascript - HTTP中的get、post请求和ajax的get、post请求是一个东西吗?...
  11. 卸载JLink驱动弹出“could not open INSTALL.LOG file”的解决方法
  12. 广西南宁机器人比赛_广西南宁中小学生机器人竞赛精彩纷呈
  13. 计算机系统结构 外文,计算机系统结构外文文献 计算机系统结构参考文献怎么写...
  14. 微信小程序:上传的图片显示旋转问题
  15. 三角公式以及常见关系
  16. 统一告警平台设计方案
  17. Java BIO的基本介绍
  18. 线性代数---魏福义版 第二章习题答案
  19. excel高效之删除空行
  20. 如何生成和使用CLIPS动态链接库

热门文章

  1. 5G 标准 — R18
  2. 用 C 语言开发一门编程语言 — 条件分支
  3. 一句话说清分布式锁,进程锁,线程锁
  4. kafka官方文档学习笔记2--QuickStart
  5. CentOS学习笔记 - 4. 修改端口和禁止root登录
  6. 人工智能时代下的视觉合成
  7. jQuery 选择器中的空格问题
  8. iOS-----Xcode-Debug尝试
  9. SpringMVC 项目中 Quartz 定时任务的设置纪要
  10. java 面试基础题 引用