在之前的文章当中我们曾经说道,在多线程并发的场景当中,如果我们需要感知线程之间的状态,交换线程之间的信息是一件非常复杂和困难的事情。因为我们没有更高级的系统权限,也没有上帝视角,很难知道目前运行的状态的全貌,所以想要设计出一个稳健运行没有bug的功能,不仅非常困难,而且调试起来非常麻烦。

很多人学习python,不知道从何学起。
很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手。
很多已经做案例的人,却不知道如何去学习更加高深的知识。
那么针对这三类人,我给大家提供一个好的学习平台,免费领取视频教程,电子书籍,以及课程的源代码!
QQ群:1097524789

生产消费者模式

在日常开发当中, 从一个线程向另外的线程传输数据又是一件家常便饭的事情 。举个最简单的例子,我们在处理网页请求的时候,需要打印下来这一次请求的相关日志。打印日志是一次IO行为,这是非常消耗时间的,所以我们不能放在请求当中同步进行,否则会影响系统的性能。最好的办法就是启动一系列线程专门负责打印,后端的线程只负责响应请求,相关的日志以消息的形式传送给打印线程打印。

这个简单的不能再简单的功能当中涉及了诸多细节,我们来盘点几个。首先IO线程的数据都是从后台线程来的,假如一段时间内没有请求,那么这些线程都应该休眠,应该在有请求的时候才会启动。其次,如果某一段时间内请求非常多,导致IO线程一时间来不及打印所有的数据,那么当下的请求应该先暂存起来,等IO线程”忙过来“之后再进行处理。

把这些细节都考虑到,自己来设计功能还是挺麻烦的。好在这个问题前人已经替我们想过了,并且得出了一个非常经典的设计模式,使用它可以很好的解决这个问题。这个模式就是 生产消费者模式 。

这个设计模式的原理其实非常简单,我们来看张图就明白了。

Java并发-- 生产者-消费者模式| 点滴积累

线程根据和数据的关系分为 生产者线程和消费者线程 ,其中生产者线程负责生产数据,产生了数据之后会存储到任务队列当中。消费者线程从这个队列获取需要消费的数据,它和生产者线程之间不会直接交互,避免了线程之间互相依赖的问题。

另外一个细节是这里的任务队列并不是普通的队列,一般情况下是一个 阻塞队列 。也就是说当消费者线程尝试从其中获取数据的时候,如果队列是空的,那么这些消费者线程会自动挂起等待,直到它获得了数据为止。有阻塞队列当然也有非阻塞队列,如果是非阻塞队列的话,当我们尝试从其中获取数据的时候,如果它当中没有数据的话,并不会挂起等待,而是会返回一个空值。

当然阻塞队列的挂起等待时间也是可以设置的,我们可以让它一直等待下去,也可以设置一个最长等待时间 。如果超过这个时间也会返回空,不同的队列应用在不同的场景当中,我们需要根据场景性质做出调整。

代码实现

看完了设计模式的原理,我们下面来试着用代码来实现一下。

在一般的高级语言当中都有现成的队列的库,由于在生产消费者模式当中用到的是阻塞型queue,有阻塞性的队列当然也就有非阻塞型的队列。我们在用之前需要先了解清楚,如果用错了队列会导致整个程序出现问题。在Python当中,我们最常用的queue就是一个 支持多线程场景的阻塞队列 ,所以我们直接拿来用就好了。

由于这个设计模式非常简单,这个代码并不长只有几行:

from queue import Queue
from threading import Threaddef producer(que):data = 0while True:data += 1que.put(data)def consumer(que):while True:data = que.get()print(data)que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

我们运行一下就会发现它是可行的,并且由于队列 先进先出 的限制,可以保证了consumer线程读取到的内容的 顺序和producer生产的顺序是一致的 。

如果我们运行一下这个代码会发现它是不会结束的,因为consumer和producer当中都用到了while True构建的死循环,假设我们希望可以控制程序的结束,应该怎么办?

其实也很简单,我们也可以利用队列。我们创建一个特殊的信号量,约定好当consumer接受到这个特殊值的时候就停止程序。这样当我们要结束程序的时候,我们只需要把这个信号量加入队列即可。

singal = object()def producer(que):data = 0while data < 20:data += 1que.put(data)que.put(singal)def consumer(que):while True:data = que.get()if data is singal:# 继续插入singalque.put(singal)breakprint(data)

这里有一个细节是我们在consumer当中,当读取到singal的时候,在跳出循环之前我们又把singal放回了队列。原因也很简单,因为有时候consumer线程不止一个,这个singal上游 只放置了一个,只会被一个线程读取进来 ,其他线程并不会知道已经获得了singal的消息,所以还是会继续执行。

而当consumer关闭之前放入singal就可以保证每一个consumer在关闭的之前都会再传递一个结束的信号给其他未关闭的consumer读取。这样一个一个的传递,就可以保证所有consumer都关闭。

这里还有一个小细节,虽然利用队列可以解决生产者和消费者通信的问题,但是上游的生产者并不知道下游的消费者是否已经执行完成了。假如我们想要知道,应该怎么办?

Python的设计者们也考虑到了这个问题,所以他们在Queue这个类当中加入了 task_done和join方法 。利用task_done,消费者可以通知queue这一个任务已经执行完成了。而通过调用join,可以等待所有的consumer完成。

from queue import Queue
from threading import Threaddef producer(que):data = 0while data < 20:data += 1que.put(data)def consumer(que):while True:data = que.get()print(data)que.task_done()que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()que.join()

除了使用task_done之外,我们还可以在que传递的消息当中加入一个Event,这样我们还可以继续感知到每一个Event执行的情况。

优先队列与其他设置

我们之前在介绍一些分布式调度系统的时候曾经说到过,在调度系统当中,调度者会用一个优先队列来管理所有的任务。当有机器空闲的时候,会有限调度那些优先级高的任务。

其实这个调度系统也是基于我们刚才介绍的生产消费者模型开发的,只不过 将调度队列从普通队列换成了优先队列 而已。所以如果我们也希望我们的consumer能够根据任务的优先级来改变执行顺序的话,也可以使用优先队列来进行管理任务。

关于优先队列的实现我们已经很熟悉了,但是有一个问题是我们需要实现挂起等待的阻塞功能。这个我们自己实现是比较麻烦的,但好在我们可以通过调用相关的库来实现。比如threading中的Condition, Condition是一个条件变量可以通知其他线程,也可以实现挂起等待 。

from threading import Thread, Conditionclass PriorityQueue:def __init__(self):self._queue = []self._cv = Condition()def put(self, item, priority):with self._cv:heapq.heappush(self._queue, (-priority, self._count, item))# 通知下游,唤醒wait状态的线程self._cv.notify()def get(self):with self._cv:# 如果对列为空则挂起while len(self._queue) == 0:self._cv.wait()# 否则返回优先级最大的return heapq.heappop(self._queue)[-1]

最后介绍一下Queue的其他设置,比如我们可以 通过size参数设置队列的大小 ,由于这是一个阻塞式队列,所以如果我们设置了队列的大小,那么当队列被装满的时候,往其中插入数据的操作也会被阻塞。此时producer线程会被挂起,一直到队列不再满为止。

当然我们也可以通过block参数 将队列的操作设置成非阻塞 。比如que.get(block=False),那么当队列为空的时候,将会抛出一个队列为空的异常。同样,que.put(data, block=False)时也一样会得到一个队列已满的异常。

总结

今天这篇文章当中我们主要介绍了多线程场景中经典的生产消费者模式,这个模式在许多场景当中都有使用。比如kafka等消息系统,以及yarn等调度系统等等,几乎只要是涉及到多线程上下游通信的,往往都会用到。也正因此它的使用场景太广了,所以它 经常在各种面试当中出现 ,也可以认为是工程师必须知道的几种基础设计模式之一。

另外,队列也是一个在设计模式以及使用场景当中经常出现的数据结构。从侧面也说明了,为什么算法和数据结构非常重要,许多大公司喜欢问一些算法题,也是因为 有实际的使用场景 ,并且的的确确能锻炼工程师的思维能力。经常有同学问我算法和数据结构的使用案例,这就是一个很好的例子。

Python爬虫的经典多线程方式,生产者与消费者模型相关推荐

  1. Java多线程案例--生产者和消费者模型(送奶人和喝奶人的故事!)

    文章目录 一.进程和线程 1.进程 2.线程 3.进程与线程的区别 二.生产者和消费者模型 1.生产者消费者模式概述 2.奶箱类 3.生产者类 4.消费者类 三.测试 1.测试类(BoxDemo) 2 ...

  2. 爬虫--05:多线程与生产者消费者模型

    Crawler - 05: Multithreading- und Produzenten-Verbrauchermodell 多线程 一.多线程的基本介绍 1.介绍 2.程序中模拟多任务 二.创建多 ...

  3. python爬虫实战之多线程爬取前程无忧简历

    python爬虫实战之多线程爬取前程无忧简历 import requests import re import threading import time from queue import Queu ...

  4. python爬虫第二弹-多线程爬取网站歌曲

    python爬虫第二弹-多线程爬取网站歌曲 一.简介 二.使用的环境 三.网页解析 1.获取网页的最大页数 2.获取每一页的url形式 3.获取每首歌曲的相关信息 4.获取下载的链接 四.代码实现 一 ...

  5. java多线程 生产者消费者_java多线程之-生产者与消费者

    java多线程之-并发协作[生产者与消费者]模型 对于多线程程序来说,不管c/c++ java python 等任何编程语言,生产者与消费者模型都是最为经典的.也就是可以说多线程的并发协作 对于此模型 ...

  6. python 进程间同步_python之路29 -- 多进程与进程同步(进程锁、信号量、事件)与进程间的通讯(队列和管道、生产者与消费者模型)与进程池...

    所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了.至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠 ...

  7. Java多线程技术~生产者和消费者问题

    Java多线程技术~生产者和消费者问题 本文是上一篇文章的后续,详情点击该连接 线程通信 应用场景:生产者和消费者问题 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取 ...

  8. python生产和消费模型_python queue和生产者和消费者模型

    queue队列 当必须安全地在多个线程之间交换信息时,队列在线程编程中特别有用. classqueue.Queue(maxsize=0) #先入先出classqueue.LifoQueue(maxsi ...

  9. Linux系统编程39:多线程之基于阻塞队列生产者与消费者模型

    文章目录 (1)生产者与消费者模型概述 (2)生产者与消费者模型优点 (3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者) (4)基于阻塞队列(blockingque ...

最新文章

  1. openlayers map获取全部feature_tf2.0基础-tf.data与tf.feature_column
  2. WPF 漏斗控件 等待沙漏效果
  3. 大数据催生决策新模式 未来将改变更多
  4. Android查看挂载的分区(如oem)
  5. Java NIO使用及原理分析(二)
  6. PHP聊天记录内啥,PHP的PSR系列轨范都有啥内容
  7. MyEclipse 的 TCP/IP Monitor 的使用
  8. PLSQL登录报错ORA-12154
  9. 30 个 Python 的最佳实践、小贴士和技巧,不可错过哟!
  10. 探究Lucene计算权重的过程
  11. 吉林大学计算机专业宿舍研究生,吉林大学计算机系的研究生宿舍怎么样?我想考那的..._在职考研_帮考网...
  12. oracle catalog命令,catalog 命令
  13. TC中的HTB队列简单创建与过滤
  14. Google Earth 6 Beta版发布 (供下载地址)
  15. hbase的学习逻辑_HBase-1.0.1学习笔记(二)HBase数据模型
  16. CentOS 6.7安装gcc4.8.2
  17. 用selenium爬取斗鱼信息
  18. java 执行class文件
  19. PICkits3调试功能
  20. 魔法门之英雄无敌3 android,魔法门之英雄无敌3 v0.86.04

热门文章

  1. 使用匿名函数动态设置前置或者后置操作(装饰器模式的)
  2. Apache配置详解(一)
  3. Spring AspectJ Execution 表达式
  4. 《从零开始学Swift》学习笔记(Day 40)——析构函数
  5. CentOs6.5 更新python2.7,以及tab自动补全
  6. openwabmail问题解决方法
  7. git 分支管理策略 与 物理实现 --author by阮一峰 小鱼
  8. [旧博客]不用编程也能搞定作弊刷票
  9. 内容主题windows下简单的vbscript自动发送邮件--带附件
  10. Android4.0/Android4.1 WifiStateMachine状态机结构图