生产消费者模型

  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

  实例1:

from multiprocessing import Process,Queue
import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生产了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()print('主')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

  例子2:

from multiprocessing import Process,Queue
import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生产了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()p.join()q.put(None)print('主')

注意:以上发送可以放在生产函数中循环完进行发送,当然也可以如上放在主进程中进行发送,但是前提是必须等生产子进程结束才可以。

========================用个小栗子来理解=================================================================

举一个小栗子,(寄信) 1、你把信写好——相当于生产者制造数据2、你把信放入邮筒——相当于生产者把数据放入缓冲区3、邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区4、邮递员把信拿去邮局做相应的处理——相当于消费者处理数据优势
缓冲区作用:
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。再举个小栗子
如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强 耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱 耦合)。2:支持并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只能一直等着
而使用这个模型,生产者把制造出来的数据只需要放在缓冲区即可,不需要等待消费者来取再举个小栗子
从寄信的例子来看。如果没有邮筒,你得拿着信傻站在路口等邮递员过来收(相当于生产者阻塞);又或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。不管是哪种方法,都挺耗时间的3:支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。再举个小栗子
假设邮递员一次只能带走1000封信。万一某次碰上情人节送贺卡,需要寄出去的信超过1000封,这时候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来时再拿走。

二、生产者消费模型

  总结:

    ---生产者消费者模型程序中两种角色:①负责生产数据(生产者);②负责处理数据(消费者)

    ---生产者消费者模型的作用:平衡生产者与消费者之间的速度差。

    ---实现方式:生产者——>队列——>消费者

  如上篇博客内容关于生产消费模型内容,在生产者生产数据的过程结束后,即使消费者已将数据完全获取,消费者程序也不能结束,需由主进程或者生产者在结束生产程序后发送给消费者结束口令,消费者程序才会结束。但是如果出现多个消费者和多个生产者,这种情况又该如何解决?方法如下两种:

1、根据消费者数量传送结束信号(low)

from multiprocessing import Process,Queue
import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生产了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()p.join()q.put(None)print('主')

from multiprocessing import Process,Queue
import time
import random
import os
def producer(name,q):for i in range(10):res='%s%s' %(name,i)time.sleep(random.randint(1, 3))q.put(res)print('%s生产了%s' %(os.getpid(),res))
def consumer(name,q):while True:res=q.get()if not res:breakprint('%s吃了%s' %(name,res))
if __name__=='__main__':q=Queue()p1=Process(target=producer,args=('巧克力',q))p2=Process(target=producer,args=('甜甜圈',q))p3=Process(target=producer, args=('奶油蛋糕',q))c1=Process(target=consumer,args=('alex',q))c2=Process(target=consumer,args=('egon',q))_p=[p1,p2,p3,c1,c2]for p in _p:p.start()p1.join()p2.join()p3.join()'''保证生产程序结束后,再发送结束信号,发送数量和消费者数量一致'''q.put(None)q.put(None)

天啊噜

2、JoinableQueue队列机制

 JoinableQueue与Queue队列基本相似,但前者队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。Queue实例的对象具有的方法JoinableQueue同样具有,除此JoinableQueue还具有如下方法:

  ①q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

  ②q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,food,q):for i in range(10):res='%s%s' %(food,i)time.sleep(random.randint(1, 3))q.put(res)print('%s生产了%s' %(name,res))q.join()  #阻塞生产者进程,保证此进程结束时消费者进程已处理完其产生的数据
def consumer(name,q):while True:res=q.get()if not res:breakprint('%s吃了%s' %(name,res))q.task_done()  #向q.join()发送一次信号,证明一个数据已经被取走了
if __name__=='__main__':q=JoinableQueue()p1=Process(target=producer,args=(1,'巧克力',q))p2=Process(target=producer,args=(2,'奶油蛋糕',q))p3 = Process(target=producer, args=(3,'冰糖葫芦', q))c1=Process(target=consumer,args=('lishi',q))c2=Process(target=consumer,args=('jassin',q))'''守护进程保证主进程结束时,守护进程也立即结束'''c1.daemon=Truec2.daemon=True_p=[p1,p2,p3,c1,c2]for p in _p:p.start()p1.join()p2.join()p3.join()

二、回调函数

  进程池执行完一个获得数据的进程,即刻要求通知主进程拿去解析数据。主进程调用一个函数去处理,这个函数便被称为回调函数,要求进程池进程的结果为回调函数的参数。

  爬虫实例:

线程池

import requests
from concurrent.futures import ThreadPoolExecutor(线程池),ProcessPoolExecutor(进程池)
from threading import current_thread
import time
import osdef get(url):  # 下载print('%s GET %s' %(current_thread().getName(),url))response=requests.get(url)time.sleep(3)if response.status_code == 200:   # 固定,=200表示下载完成return {'url':url,'text':response.text}def parse(obj):  # 解析res=obj.result()print('[%s] <%s> (%s)' % (current_thread().getName(), res['url'],len(res['text'])))if __name__ == '__main__':urls = ['https://www.python.org','https://www.baidu.com','https://www.jd.com','https://www.tmall.com',]t=ThreadPoolExecutor(2)for url in urls:t.submit(get,url).add_done_callback(parse) t.shutdown(wait=True)print('主',os.getpid())

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。

进程池

import requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import osdef get(url):print('%s GET %s' %(os.getpid(),url))response=requests.get(url)time.sleep(3)if response.status_code == 200:return {'url':url,'text':response.text}def parse(obj):res=obj.result()print('[%s] <%s> (%s)' % (os.getpid(), res['url'],len(res['text'])))if __name__ == '__main__':urls = ['https://www.python.org','https://www.baidu.com','https://www.jd.com','https://www.tmall.com',]t=ProcessPoolExecutor(2)for url in urls:t.submit(get,url).add_done_callback(parse)t.shutdown(wait=True)print('主',os.getpid())

转载于:https://www.cnblogs.com/jassin-du/p/7978020.html

并发编程之多进程3 (生产者与消费者模型) 回调函数相关推荐

  1. 并发编程之多进程编程(python版)

    目录 1 python多进程编程概述 2 需求和方案 背景: 需求: 解决思路: 需要解决的问题和方案: 3 完整代码 1 python多进程编程概述 python中的多线程无法利用多核优势,如果想要 ...

  2. 并发编程之多进程篇之四

    主要知识点:互斥锁.队列和生产者消费者模型 一.互斥锁 1.进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱 ...

  3. python多进程编程 多个函数并发执行_python并发编程之多进程编程

    一.multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.P ...

  4. python互斥锁原理_python并发编程之多进程1------互斥锁与进程间的通信

    一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行 ...

  5. 并发编程之——多进程

    一.基本概念 1.1 进程 其实进程就是正在进行的一个程序或者任务,而负责执行任务的是CPU,执行任务的地方是内存.跟程序相比,程序仅仅是一堆代码而已,而程序运行时的过程才是进程.另外同一个程序执行两 ...

  6. python 进程间同步_python之路29 -- 多进程与进程同步(进程锁、信号量、事件)与进程间的通讯(队列和管道、生产者与消费者模型)与进程池...

    所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了.至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠 ...

  7. linux进程间通信:system V 信号量 生产者和消费者模型编程案例

    生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...

  8. 并发编程之多进程进程进程

    Python 并发编程之多进程 1.1 multiprocessing 模块 Python 中的多线程无法利用多核资源,如果想要充分的使用多核 cpu 的资源,在 Python 中大部分情况需要使用多 ...

  9. Linux系统编程---17(条件变量及其函数,生产者消费者条件变量模型,生产者与消费者模型(线程安全队列),条件变量优点,信号量及其主要函数,信号量与条件变量的区别,)

    条件变量 条件变量本身不是锁!但它也可以造成线程阻塞.通常与互斥锁配合使用.给多线程提供一个会合的场所. 主要应用函数: pthread_cond_init 函数 pthread_cond_destr ...

  10. Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

    如题,使用条件变量Cond和channel通道实现多个生产者和消费者模型.Go语言天生带有C语言的基因,很多东西和C与很像,但是用起来 绝对比C语言方便.今天用Go语言来实现下多消费者和生产者模型.如 ...

最新文章

  1. 在iOS上使用ffmpeg播放视频
  2. SDN/NFV 网络技术系列文章
  3. Educational Codeforces Round 75 (Rated for Div. 2)
  4. 【Linux】一步一步学Linux——ip命令(183)
  5. Tensorflow 入门教程
  6. saleor的测试用账户地址This value is not valid for the address
  7. jzoj100047-基因变异【位运算,bfs】
  8. 数据镜像备份工具rsync
  9. Nacos服务发现控制台预览
  10. cron 每年执行一次_crontab服务执行定时脚本,在指定时间内让php执行处理业务逻辑...
  11. 通过反射获取类的完整结构(1)--属性方法
  12. 5python 体脂率计算(优化版)
  13. DVR硬盘录像机技术
  14. 【easyui】之DataGrid数据显示
  15. 线性变换的不变子空间和特征子空间的关系
  16. 如何在linux系统上添加网口打印机
  17. 牧牛区块链,区块链经济学应该关注的问题
  18. 【QT-3】tableWidget控件
  19. Linux下各压缩方式测试(压缩率和使用时间)
  20. php学生管理系统视频教程,学生管理系统——PHP

热门文章

  1. 动态字段插入及查询redis小结
  2. Linux SendMail服务启动慢总结
  3. “CEPH浅析”系列之七——关于CEPH的若干想法
  4. 数据中心布线系统的整体规划
  5. 14门Linux课程,打通你Linux的任督二脉!
  6. 车用计算机内部结构图,ecu的基本组成结构 ECU的工作过程
  7. 自定义JWT认证过滤器
  8. MySQL高级 - 案例 - AOP记录日志
  9. Nacos配置管理-nacos集群搭建
  10. Nginx的功能特性及常用功能