



import queue
import threadingq = queue.Queue()def worker():while True:item = q.get()if item is None:breakprint('item ', item)q.task_done()threads = []
for i in range(10):t = threading.Thread(target=worker)t.start()threads.append(t)for item in range(20):q.put(item)q.join()for i in range(10):q.put(None)for t in threads:t.join()


item  0
item  1
item  2
item  3
item  4
item  5
item  6
item  7
item  8
item  9
item  10
item  11
item  12
item  13
item  14
item  15
item  16
item  17
item  18
item  19




class Queue:'''Create a queue object with a given maximum size.If maxsize is <= 0, the queue size is infinite.'''def __init__(self, maxsize=0):self.maxsize = maxsize                                                  # 初始化队列的大小,self._init(maxsize)                                                     # 初始化一个队列deque实例# mutex must be held whenever the queue is mutating.  All methods# that acquire mutex must release it before returning.  mutex# is shared between the three conditions, so acquiring and# releasing the conditions also acquires and releases mutex.self.mutex = threading.Lock()                                           # 获取线程锁# Notify not_empty whenever an item is added to the queue; a# thread waiting to get is notified then.self.not_empty = threading.Condition(self.mutex)                        # 获取为空的条件变量# Notify not_full whenever an item is removed from the queue;# a thread waiting to put is notified then.self.not_full = threading.Condition(self.mutex)                         # 获取未满的条件变量# Notify all_tasks_done whenever the number of unfinished tasks# drops to zero; thread waiting to join() is notified to resumeself.all_tasks_done = threading.Condition(self.mutex)                   # 获取全部任务完成的条件变量self.unfinished_tasks = 0                                               # 未完成的任务数量def task_done(self):'''Indicate that a formerly enqueued task is complete.Used by Queue consumer threads.  For each get() used to fetch a task,a subsequent call to task_done() tells the queue that the processingon the task is complete.If a join() is currently blocking, it will resume when all itemshave been processed (meaning that a task_done() call was receivedfor every item that had been put() into the queue).Raises a ValueError if called more times than there were itemsplaced in the queue.'''with self.all_tasks_done:                                               # 获取完成任务对应的条件变量的锁unfinished = self.unfinished_tasks - 1                              # 未完成任务数量减一if unfinished <= 0:                                                 # 如果减到最后小于等于0if unfinished < 0:raise ValueError('task_done() called too many times')       # 为0则报错self.all_tasks_done.notify_all()                                # 如果为1  则通知等待的任务的条件变量唤醒所有等待该条件变量的线程self.unfinished_tasks = unfinished                                  # 重置未完成任务数量def join(self):'''Blocks until all items in the Queue have been gotten and processed.The count of unfinished tasks goes up whenever an item is added to thequeue. The count goes down whenever a consumer thread calls task_done()to indicate the item was retrieved and all work on it is complete.When the count of unfinished tasks drops to zero, join() unblocks.'''with self.all_tasks_done:                                               # 获取任务完成条件变量while self.unfinished_tasks:                                        # 只要有未完成的任务self.all_tasks_done.wait()                                      # 则阻塞等待def qsize(self):'''Return the approximate size of the queue (not reliable!).'''with self.mutex:                                                        # 获取线程锁     return self._qsize()                                                # 返回队列的大小def empty(self):'''Return True if the queue is empty, False otherwise (not reliable!).This method is likely to be removed at some point.  Use qsize() == 0as a direct substitute, but be aware that either approach risks a racecondition where a queue can grow before the result of empty() orqsize() can be used.To create code that needs to wait for all queued tasks to becompleted, the preferred technique is to use the join() method.'''with self.mutex:                                                        # 获取线程锁return not self._qsize()                                            # 判断是否队列为空def full(self):'''Return True if the queue is full, False otherwise (not reliable!).This method is likely to be removed at some point.  Use qsize() >= nas a direct substitute, but be aware that either approach risks a racecondition where a queue can shrink before the result of full() orqsize() can be used.'''with self.mutex:                                                        # 获取线程锁return 0 < self.maxsize <= self._qsize()                            # 判断队列是否满了def put(self, item, block=True, timeout=None):'''Put an item into the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until a free slot is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Full exception if no free slot was available within that time.Otherwise ('block' is false), put an item on the queue if a free slotis immediately available, else raise the Full exception ('timeout'is ignored in that case).'''with self.not_full:                                                     # 获取没有满的锁if self.maxsize > 0:                                                # 如果次数大于0 if not block:                                                   # 是否为阻塞if self._qsize() >= self.maxsize:                           # 如果现在的队列数量大于等于最大数量则报已经满了的错误raise Fullelif timeout is None:                                           # 如果没有传入过期时间while self._qsize() >= self.maxsize:                        # 如果当前的队列数量大于等于最大队列数量self.not_full.wait()                                    # 则阻塞直到可以往队列插入elif timeout < 0:                                               # 如果过期时间小于0 则报错raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeout                                  # 获取指定的具体时间节点while self._qsize() >= self.maxsize:                        # 如果当前队列数量大于等于最大队列数量remaining = endtime - time()                            # 获取剩余的时间if remaining <= 0.0:                                    # 如果小于等于0 raise Full                                          # 报满了错误self.not_full.wait(remaining)                           # 否则等待remaining事件self._put(item)                                                     # 添加到队列中self.unfinished_tasks += 1                                          # 未完成任务数加1self.not_empty.notify()                                             # 调用未满条件变量唤醒等待该条件变量的线程def get(self, block=True, timeout=None):'''Remove and return an item from the queue.If optional args 'block' is true and 'timeout' is None (the default),block if necessary until an item is available. If 'timeout' isa non-negative number, it blocks at most 'timeout' seconds and raisesthe Empty exception if no item was available within that time.Otherwise ('block' is false), return an item if one is immediatelyavailable, else raise the Empty exception ('timeout' is ignoredin that case).'''with self.not_empty:                                                    # 获取未空的条件变量的锁if not block:                                                       # 是否为阻塞if not self._qsize():                                           # 如果为非阻塞raise Empty                                                 # 当队列为空的时候就直接报错elif timeout is None:                                               # 检查传入的过期时间是否为空while not self._qsize():                                        # 检查队列事务为空self.not_empty.wait()                                       # 队列为空则等待not_empty条件变量被唤醒,此时就陷入阻塞elif timeout < 0:                                                   # 如果传入过期时间小于0则报错raise ValueError("'timeout' must be a non-negative number")else:endtime = time() + timeout                                      # 获取过期时间while not self._qsize():                                        # 如果队列为零remaining = endtime - time()                                # 计算等待的时间if remaining <= 0.0:                                        # 如果时间小于0 则报队列已经为空的错误raise Emptyself.not_empty.wait(remaining)                              # 否则阻塞等待remaining秒item = self._get()                                                  # 获取队列的值self.not_full.notify()                                              # 未满的条件变量 唤醒所有等待该条件变量的线程return item                                                         # 返回从队列中获取的值def put_nowait(self, item):'''Put an item into the queue without blocking.Only enqueue the item if a free slot is immediately available.Otherwise raise the Full exception.'''return self.put(item, block=False)                                      # 如果队列已满则直接报错def get_nowait(self):'''Remove and return an item from the queue without blocking.Only get an item if one is immediately available. Otherwiseraise the Empty exception.'''return self.get(block=False)                                            # 如果队列为空则直接报错返回# Override these methods to implement other queue organizations# (e.g. stack or priority queue).# These will only be called with appropriate locks held# Initialize the queue representationdef _init(self, maxsize):self.queue = deque()                                                    # 初始化一个队列def _qsize(self):return len(self.queue)                                                  # 获取队列的长度# Put a new item in the queuedef _put(self, item):self.queue.append(item)                                                 # 添加一个元素进队列# Get an item from the queuedef _get(self):return self.queue.popleft()                                             # 弹出一个元素出队列





