Python标准库queue模块原理浅析

本文环境python3.5.2

queue模块的实现思路

作为一个线程安全的队列模块,该模块提供了线程安全的一个队列,该队列底层的实现基于Python线程threading中的Condition原理来实现的队列(对该原理的讲解可参考以前博文)。本文就先概述一下queue队列的使用示例;

import queue
import threadingq = queue.Queue()def worker():while True:item = q.get()if item is None:breakprint('item ', item)q.task_done()threads = []
for i in range(10):t = threading.Thread(target=worker)t.start()threads.append(t)for item in range(20):q.put(item)q.join()for i in range(10):q.put(None)for t in threads:t.join()

输出结果如下;

item  0
item  1
item  2
item  3
item  4
item  5
item  6
item  7
item  8
item  9
item  10
item  11
item  12
item  13
item  14
item  15
item  16
item  17
item  18
item  19

由输出结果可知,队列保证了线程之间数据的安全性,通过初始化一个Queue的实例,然后向该实例中put数据,让线程get等待,此时就保证了各个线程之间互斥的取数据并进行相关的业务处理。本文来简述一下Queue类的大概流程。

Queue类的实现原理

首先查看Queue类的定义;

class Queue:'''Create a queue object with a given maximum size.If maxsize is <= 0, the queue size is infinite.'''def __init__(self, maxsize=0):self.maxsize = maxsize                                                  # 初始化队列的大小,self._init(maxsize)                                                     # 初始化一个队列deque实例# mutex must be held whenever the queue is mutating.  All methods# that acquire mutex must release it before returning.  mutex# is shared between the three conditions, so acquiring and# releasing the conditions also acquires and releases mutex.self.mutex = threading.Lock()                                           # 获取线程锁# Notify not_empty whenever an item is added to the queue; a# thread waiting to get is notified then.self.not_empty = threading.Condition(self.mutex)                        # 获取为空的条件变量# Notify not_full whenever an item is removed from the queue;# a thread waiting to put is notified then.self.not_full = threading.Condition(self.mutex)                         # 获取未满的条件变量# Notify all_tasks_done whenever the number of unfinished tasks# drops to zero; thread waiting to join() is notified to resumeself.all_tasks_done = threading.Condition(self.mutex)                   # 获取全部任务完成的条件变量self.unfinished_tasks = 0                                               # 未完成的任务数量def task_done(self):'''Indicate that a formerly enqueued task is complete.Used by Queue consumer threads.  For each get() used to fetch a task,a subsequent call to task_done() tells the queue that the processingon the task is complete.If a join() is currently blocking, it will resume when all itemshave been processed (meaning that a task_done() call was receivedfor every item that had been put() into the queue).Raises a ValueError if called more times than there were itemsplaced in the queue.'''with self.all_tasks_done:                                               # 获取完成任务对应的条件变量的锁unfinished = self.unfinished_tasks - 1                              # 未完成任务数量减一if unfinished <= 0:                                                 # 如果减到最后小于等于0if unfinished < 0:raise ValueError('task_done() called too many times')       # 为0则报错self.all_tasks_done.notify_all()                                # 如果为1  则通知等待的任务的条件变量唤醒所有等待该条件变量的线程self.unfinished_tasks = unfinished                                  # 重置未完成任务数量def join(self):'''Blocks until all items in the Queue have been gotten and processed.The count of unfinished tasks goes up whenever an item is added to thequeue. The count goes down whenever a consumer thread calls task_done()to indicate the item was retrieved and all work on it is complete.When the count of unfinished tasks drops to zero, join() unblocks.'''with self.all_tasks_done:                                               # 获取任务完成条件变量while self.unfinished_tasks:                                        # 只要有未完成的任务self.all_tasks_done.wait()                                      # 则阻塞等待def qsize(self):'''Return the approximate size of the queue (not reliable!).'''with self.mutex:                                                        # 获取线程锁     return self._qsize()                                                # 返回队列的大小def empty(self):'''Return True if the queue is empty, False otherwise (not reliable!).This method is likely to be removed at some point.  Use qsize() == 0as a direct substitute, but be aware that either approach risks a racecondition where a queue can grow before the result of empty() orqsize() can be used.To create code that needs to wait for all queued tasks to becompleted, the preferred technique is to use the join() method.'''with self.mutex:                                                        # 获取线程锁return not self._qsize()                                            # 判断是否队列为空def full(self):'''Return True if the queue is full, False otherwise (not reliable!).This method is likely to be removed at some point.  Use qsize() >= nas a direct substitute, but be aware that either approach risks a racecondition where a queue can shrink before the result of full() orqsize() can be used.'''with self.mutex:                                                        # 获取线程锁return 0 < self.maxsize <= self._qsize()                            # 判断队列是否满了def put(self, item, block=True, timeout=None):'''Put an item into the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until a free slot is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Full exception if no free slot was available within that time.Otherwise ('block' is false), put an item on the queue if a free slotis immediately available, else raise the Full exception ('timeout'is ignored in that case).'''with self.not_full:                                                     # 获取没有满的锁if self.maxsize > 0:                                                # 如果次数大于0 if not block:                                                   # 是否为阻塞if self._qsize() >= self.maxsize:                           # 如果现在的队列数量大于等于最大数量则报已经满了的错误raise Fullelif timeout is None:                                           # 如果没有传入过期时间while self._qsize() >= self.maxsize:                        # 如果当前的队列数量大于等于最大队列数量self.not_full.wait()                                    # 则阻塞直到可以往队列插入elif timeout < 0:                                               # 如果过期时间小于0 则报错raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeout                                  # 获取指定的具体时间节点while self._qsize() >= self.maxsize:                        # 如果当前队列数量大于等于最大队列数量remaining = endtime - time()                            # 获取剩余的时间if remaining <= 0.0:                                    # 如果小于等于0 raise Full                                          # 报满了错误self.not_full.wait(remaining)                           # 否则等待remaining事件self._put(item)                                                     # 添加到队列中self.unfinished_tasks += 1                                          # 未完成任务数加1self.not_empty.notify()                                             # 调用未满条件变量唤醒等待该条件变量的线程def get(self, block=True, timeout=None):'''Remove and return an item from the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until an item is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Empty exception if no item was available within that time.Otherwise ('block' is false), return an item if one is immediatelyavailable, else raise the Empty exception ('timeout' is ignoredin that case).'''with self.not_empty:                                                    # 获取未空的条件变量的锁if not block:                                                       # 是否为阻塞if not self._qsize():                                           # 如果为非阻塞raise Empty                                                 # 当队列为空的时候就直接报错elif timeout is None:                                               # 检查传入的过期时间是否为空while not self._qsize():                                        # 检查队列事务为空self.not_empty.wait()                                       # 队列为空则等待not_empty条件变量被唤醒,此时就陷入阻塞elif timeout < 0:                                                   # 如果传入过期时间小于0则报错raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeout                                      # 获取过期时间while not self._qsize():                                        # 如果队列为零remaining = endtime - time()                                # 计算等待的时间if remaining <= 0.0:                                        # 如果时间小于0 则报队列已经为空的错误raise Emptyself.not_empty.wait(remaining)                              # 否则阻塞等待remaining秒item = self._get()                                                  # 获取队列的值self.not_full.notify()                                              # 未满的条件变量 唤醒所有等待该条件变量的线程return item                                                         # 返回从队列中获取的值def put_nowait(self, item):'''Put an item into the queue without blocking.Only enqueue the item if a free slot is immediately available.Otherwise raise the Full exception.'''return self.put(item, block=False)                                      # 如果队列已满则直接报错def get_nowait(self):'''Remove and return an item from the queue without blocking.Only get an item if one is immediately available. Otherwiseraise the Empty exception.'''return self.get(block=False)                                            # 如果队列为空则直接报错返回# Override these methods to implement other queue organizations# (e.g. stack or priority queue).# These will only be called with appropriate locks held# Initialize the queue representationdef _init(self, maxsize):self.queue = deque()                                                    # 初始化一个队列def _qsize(self):return len(self.queue)                                                  # 获取队列的长度# Put a new item in the queuedef _put(self, item):self.queue.append(item)                                                 # 添加一个元素进队列# Get an item from the queuedef _get(self):return self.queue.popleft()                                             # 弹出一个元素出队列

Queue的类的全部定义如上所述,当调用q.task_done的时候就会减少一个任务数量,然后通知join()处继续循环遍历,直到所有任务都完成之后,调用join处的线程就不会阻塞,就执行完成,这是all_task_done条件变量的主要用途;在q的put中的时候,首先先检查是否可以讲数据插入到队列中,如果能够插入则直接插入后调用not_empty的条件变量通知多有调用get的线程可以取数据,如果队列满了则等待not_full的条件变量阻塞在此,此时当一个数据被get出去的时候,则会唤醒等待插入的线程然后继续向队列中插入数据;调用q的get方法同理。本文主要就是简单的分析了一下Queue的类的实现原理,主要还是依赖于Condition类的实现来保证了队列在线程之间的数据交互的问题。

总结

本文的Queue线程安全的队列主要基于Condition的实现来完成的,实现的原理相对不算复杂,主要就是三个条件变量之间的阻塞与唤醒操作,这样保证了当达到了队列最大长度或者队列为空时,各个线程对队列的读或写的操作的原子性。本文内容相对较少,除了示例之外大家可自行查阅相关案例进行分析,鉴于本人才疏学浅,如有疏漏请批评指正。

Python标准库queue模块原理浅析相关推荐

  1. Python标准库asyncio模块基本原理浅析

    Python标准库asyncio模块基本原理浅析 本文环境python3.7.0 asyncio模块的实现思路 当前编程语言都开始在语言层面上,开始简化对异步程序的编程过程,其中Python中也开始了 ...

  2. Python标准库threading模块Condition原理浅析

    Python标准库threading模块Condition原理浅析 本文环境python3.5.2 threading模块Condition的实现思路 在Python的多线程实现过程中,在Linux平 ...

  3. Python 标准库 functools 模块详解

    functools 官方文档:https://docs.python.org/zh-cn/3/library/functools.html Python 标准模块 --- functools:http ...

  4. python的csv标准库,Python标准库: csv模块——CSV文件的读写

    CSV简介 CSV(Comma Separated Values,逗号分隔值)也称字符分隔值,因为分隔符可以不是逗号,是一种常用的文本格式,用以存储表格数据,包括数字或者字符.很多程序在处理数据时都会 ...

  5. Python标准库——collections模块的Counter类

    更多16 最近在看一本名叫<Python Algorithm: Mastering Basic Algorithms in the Python Language>的书,刚好看到提到这个C ...

  6. Python标准库collections模块的Counter类

    collections模块 collections模块自Python 2.4版本开始被引入,包含了dict.set.list.tuple以外的一些特殊的容器类型,分别是: OrderedDict类:排 ...

  7. Python 标准库 - Pprint 模块 - 用于打印 Python 数据结构

    使用 pprint 模块 pprint 模块( pretty printer ) 用于打印 Python 数据结构. 当你在命令行下打印特定数据结构时你会发现它很有用(输出格式比较整齐, 便于阅读). ...

  8. Python标准库--time模块的详解

    time模块 - - -时间获取和转换 在我们学习time模块之前需要对以下的概念进行了解: 时间戳:时间戳是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08 ...

  9. Python 标准库 —— queue、heapq与PriorityQueue

    优先队列,有别于普通队列的先入先出(虽然字面上还是队列,但其实无论从含义还是实现上,和普通队列都有很大的区别),也有别于栈的先入后出.在实现上,它一般通过堆这一数据结构,而堆其实是一种完全二叉树,它会 ...

最新文章

  1. 部署CFCA_RA本地测试环境
  2. Spring.NET的AOP怎么玩
  3. VC++ VS2010 error LNK1123 转换到 COFF 期间失败 怎么办
  4. 文献记录(part55)--基于分布式非负矩阵分解的大规模主题社区挖掘
  5. JSP数据库连接方式总结
  6. [vue] vue中怎么重置data?
  7. 职员)2015-11-09 星期一 日志
  8. php算法结构,PHP中常用算法以及数据结构
  9. 安卓智能手机刷机前的准备工作
  10. Zotero英文翻译插件安装教程
  11. 写个单机版斗地主程序,复习c++面向对象
  12. 7-8 jmu-Java-03面向对象-06-继承覆盖综合练习-Person、Student、Employee、Company
  13. IDEA-快捷键noob
  14. html文件在线打开word,html打开word程序 html直接打开word文档
  15. 检察院批准逮捕洪磊,铁杆分子不买帐
  16. html 滑屏 效果,HTML5 web app实现手动页面滑屏效果
  17. React-Navigation的goBack()跳转到指定页面,以及不同栈之间的页面的返回操作
  18. freemaker导出Excel文件用WPS能打开,而用office打不开
  19. 资产分类计算机软件,固定资产管理系统_资产分类名称(电子计算机及其外围设备篇)...
  20. php 生成条形码(支持任意php框架)

热门文章

  1. 赠书 | JavaScript 武力值飙升!用 TensorFlow.js 轻松在浏览器里搞深度学习
  2. 在Rust代码中编写Python是种怎样的体验?
  3. AI新基建如何构建?浪潮给出了一个答案
  4. 一个月入门Python爬虫,轻松爬取大规模数据
  5. ​50年来最具影响力的十大编程语言!
  6. 周礼栋:现在是计算机系统和网络研究“最好的时代”
  7. 儿科医生的眼泪,全被数据看见了
  8. 十年程序员的告诫:千万不要重写代码!
  9. 一个比 Spring Boot 快 44 倍的 Java 框架
  10. Spring Boot 定义接口的方法是否可以声明为 private?