阻塞队列是对普通队列的一种扩展,在普通队列功能上增加了一些额外功能。

普通队列的功能可以参照java的Queue接口

public interface Queue<E> extends Collection<E> {/*** Inserts the specified element into this queue if it is possible to do so* immediately without violating capacity restrictions, returning* {@code true} upon success and throwing an {@code IllegalStateException}* if no space is currently available.** @param e the element to add* @return {@code true} (as specified by {@link Collection#add})* @throws IllegalStateException if the element cannot be added at this*         time due to capacity restrictions* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null and*         this queue does not permit null elements* @throws IllegalArgumentException if some property of this element*         prevents it from being added to this queue*/boolean add(E e);/*** Inserts the specified element into this queue if it is possible to do* so immediately without violating capacity restrictions.* When using a capacity-restricted queue, this method is generally* preferable to {@link #add}, which can fail to insert an element only* by throwing an exception.** @param e the element to add* @return {@code true} if the element was added to this queue, else*         {@code false}* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null and*         this queue does not permit null elements* @throws IllegalArgumentException if some property of this element*         prevents it from being added to this queue*/boolean offer(E e);/*** Retrieves and removes the head of this queue.  This method differs* from {@link #poll poll} only in that it throws an exception if this* queue is empty.** @return the head of this queue* @throws NoSuchElementException if this queue is empty*/E remove();/*** Retrieves and removes the head of this queue,* or returns {@code null} if this queue is empty.** @return the head of this queue, or {@code null} if this queue is empty*/E poll();/*** Retrieves, but does not remove, the head of this queue.  This method* differs from {@link #peek peek} only in that it throws an exception* if this queue is empty.** @return the head of this queue* @throws NoSuchElementException if this queue is empty*/E element();/*** Retrieves, but does not remove, the head of this queue,* or returns {@code null} if this queue is empty.** @return the head of this queue, or {@code null} if this queue is empty*/E peek();
}

其实主要就是入队、出队操作,入队和出队都有两种方法,add()/offer(),remove()/poll(),返回队头元素(只返回,不出队)也有两个方法element()/peek(),两组方法的功能相同,不同的是对于一些特殊情况的处理是返回特殊值还是抛出异常,比如队列已经满了的情况下调用入队操作,add()会抛出异常,offer()会返回false。具体可以看下面表:

  Throws exception Returns special value
Insert add(e) offer(e)
Remove remove() poll()
Examine element() peek()

阻塞队列接口(BlockingQueue)继承自普通队列

public interface BlockingQueue<E> extends Queue<E> {/*** Inserts the specified element into this queue if it is possible to do* so immediately without violating capacity restrictions, returning* {@code true} upon success and throwing an* {@code IllegalStateException} if no space is currently available.* When using a capacity-restricted queue, it is generally preferable to* use {@link #offer(Object) offer}.** @param e the element to add* @return {@code true} (as specified by {@link Collection#add})* @throws IllegalStateException if the element cannot be added at this*         time due to capacity restrictions* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*         element prevents it from being added to this queue*/boolean add(E e);/*** Inserts the specified element into this queue if it is possible to do* so immediately without violating capacity restrictions, returning* {@code true} upon success and {@code false} if no space is currently* available.  When using a capacity-restricted queue, this method is* generally preferable to {@link #add}, which can fail to insert an* element only by throwing an exception.** @param e the element to add* @return {@code true} if the element was added to this queue, else*         {@code false}* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*         element prevents it from being added to this queue*/boolean offer(E e);/*** Inserts the specified element into this queue, waiting if necessary* for space to become available.** @param e the element to add* @throws InterruptedException if interrupted while waiting* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*         element prevents it from being added to this queue*/void put(E e) throws InterruptedException;/*** Inserts the specified element into this queue, waiting up to the* specified wait time if necessary for space to become available.** @param e the element to add* @param timeout how long to wait before giving up, in units of*        {@code unit}* @param unit a {@code TimeUnit} determining how to interpret the*        {@code timeout} parameter* @return {@code true} if successful, or {@code false} if*         the specified waiting time elapses before space is available* @throws InterruptedException if interrupted while waiting* @throws ClassCastException if the class of the specified element*         prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*         element prevents it from being added to this queue*/boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** Retrieves and removes the head of this queue, waiting if necessary* until an element becomes available.** @return the head of this queue* @throws InterruptedException if interrupted while waiting*/E take() throws InterruptedException;/*** Retrieves and removes the head of this queue, waiting up to the* specified wait time if necessary for an element to become available.** @param timeout how long to wait before giving up, in units of*        {@code unit}* @param unit a {@code TimeUnit} determining how to interpret the*        {@code timeout} parameter* @return the head of this queue, or {@code null} if the*         specified waiting time elapses before an element is available* @throws InterruptedException if interrupted while waiting*/E poll(long timeout, TimeUnit unit)throws InterruptedException;/*** Returns the number of additional elements that this queue can ideally* (in the absence of memory or resource constraints) accept without* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic* limit.** <p>Note that you <em>cannot</em> always tell if an attempt to insert* an element will succeed by inspecting {@code remainingCapacity}* because it may be the case that another thread is about to* insert or remove an element.** @return the remaining capacity*/int remainingCapacity();/*** Removes a single instance of the specified element from this queue,* if it is present.  More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements.* Returns {@code true} if this queue contained the specified element* (or equivalently, if this queue changed as a result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call* @throws ClassCastException if the class of the specified element*         is incompatible with this queue*         (<a href="../Collection.html#optional-restrictions">optional</a>)* @throws NullPointerException if the specified element is null*         (<a href="../Collection.html#optional-restrictions">optional</a>)*/boolean remove(Object o);/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element* @throws ClassCastException if the class of the specified element*         is incompatible with this queue*         (<a href="../Collection.html#optional-restrictions">optional</a>)* @throws NullPointerException if the specified element is null*         (<a href="../Collection.html#optional-restrictions">optional</a>)*/public boolean contains(Object o);/*** Removes all available elements from this queue and adds them* to the given collection.  This operation may be more* efficient than repeatedly polling this queue.  A failure* encountered while attempting to add elements to* collection {@code c} may result in elements being in neither,* either or both collections when the associated exception is* thrown.  Attempts to drain a queue to itself result in* {@code IllegalArgumentException}. Further, the behavior of* this operation is undefined if the specified collection is* modified while the operation is in progress.** @param c the collection to transfer elements into* @return the number of elements transferred* @throws UnsupportedOperationException if addition of elements*         is not supported by the specified collection* @throws ClassCastException if the class of an element of this queue*         prevents it from being added to the specified collection* @throws NullPointerException if the specified collection is null* @throws IllegalArgumentException if the specified collection is this*         queue, or some property of an element of this queue prevents*         it from being added to the specified collection*/int drainTo(Collection<? super E> c);/*** Removes at most the given number of available elements from* this queue and adds them to the given collection.  A failure* encountered while attempting to add elements to* collection {@code c} may result in elements being in neither,* either or both collections when the associated exception is* thrown.  Attempts to drain a queue to itself result in* {@code IllegalArgumentException}. Further, the behavior of* this operation is undefined if the specified collection is* modified while the operation is in progress.** @param c the collection to transfer elements into* @param maxElements the maximum number of elements to transfer* @return the number of elements transferred* @throws UnsupportedOperationException if addition of elements*         is not supported by the specified collection* @throws ClassCastException if the class of an element of this queue*         prevents it from being added to the specified collection* @throws NullPointerException if the specified collection is null* @throws IllegalArgumentException if the specified collection is this*         queue, or some property of an element of this queue prevents*         it from being added to the specified collection*/int drainTo(Collection<? super E> c, int maxElements);
}

其实主要在Queue基础上增加了阻塞的入队(put())和出队(take())操作,即当队列已满时调用put()入队时,当前线程会阻塞,直到队列有空间时才会继续入队;当队列为空时,调用take()出队操作时,当前线程会阻塞,直到队列中有元素时才会继续出队。这就是阻塞队列的核心。

阻塞队列实现原理

阻塞队列有很多实现类,根据存储结构不同有ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,这里主要以LinkedBlockingQueue为例来介绍,线程池Executors类里FixedThreadPool和SingledThreadPool的作业队列都用的是LinkedBlockingQueue。

/*** Inserts the specified element at the tail of this queue, waiting if* necessary for space to become available.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*///====================①while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}

主要看上边代码①处,当前元素个数等于队列容量时,调用notFull.await()方法,当前线程阻塞在这里,知道有地方调用notFull.signal()方法,当前现在才被唤醒。

出队操作类似有个notEmpty.await()方法,当队列为空时阻塞出队线程

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

while (count.get() == 0) {notEmpty.await();
}

那么notFull.await(),notEmpty.await(),notEmpty.signal()是什么东西?有点儿像object类的wait(),notify(),看代码:

 /** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();

他们是JDK1.5引入的Condition类,和ReentrantLock锁配合实现syncronized语法的类,比syncronized语法更灵活,某些场景效率更高,具体区别和实现原理后续文章再写。

Java阻塞队列-BlockingQueue介绍及实现原理相关推荐

  1. Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    转载自  Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析 Java中的阻塞队列接口BlockingQueue继承自Queue接口. Block ...

  2. Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例

    Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例 本文由 TonySpark 翻译自 Javarevisited.转载请参见文章末尾的要求. Java.util.concurr ...

  3. Java 阻塞队列--BlockingQueue

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...

  4. java阻塞队列作用_简单理解阻塞队列(BlockingQueue)中的take/put方法以及Condition存在的作用...

    简单理解阻塞队列(BlockingQueue)中的take/put方法以及Condition存在的作用 Condition:可以理解成一把锁的一个钥匙,它既可以解锁(通知放行),又可以加锁(阻塞) n ...

  5. java 阻塞队列 BQ_阻塞队列 BlockingQueue的使用(二)

    原 阻塞队列 BlockingQueue的使用(二) BlockingQueue 的核心方法:方法类型抛出异常特殊值阻塞超时 插入add(e)offer(e)put(e)offer(e,time,un ...

  6. java阻塞队列小结

    [README] 1,本文介绍了java的7个阻塞队列: 2,阻塞队列的作用 做缓冲作用,如缓冲kafka消息,而不是直接发送给kafka,减少kafka集群的压力: [1]阻塞队列 Blocking ...

  7. 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...

  8. java阻塞队列的使用

    一.阻塞队列的作用 阻塞队列(BlockingQueue),顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如图所示: 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞 当阻塞队 ...

  9. Java阻塞队列 LinkedBlockingDeque

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/120833494 本文出自[赵彦军的博客] Java队列 Queue Java队列 ...

最新文章

  1. 牛客练习赛70 重新排列
  2. c语言编程函数补充上机题,2011年计算机二级C语言上机操作题及答案(10)
  3. 在Visual C++中用ADO进行数据库编程(上)
  4. 样条曲面_用SolidWorks曲面将六个小圆管向大圆管过渡
  5. 如何通过投掷一枚硬币产生各种概率
  6. fatal: remote origin already exists
  7. python去除读取文件中多余的空行
  8. android 辐射动画_Android 四种动画效果的调用实现代码
  9. 宏基ACER Aspire R3600 REVO离子平台
  10. C语言实现24点小游戏,C语言解24点游戏程序
  11. win10搭建无盘服务器配置,win10电脑搭建无盘工作站
  12. 伺服驱动器cn1引脚定义_台达A2 A3系列伺服CN1接线对照表
  13. 追赶法 matlab编程,科学网—数值分析----三对角方程组的追赶法matlab程序 - 殷春武的博文...
  14. python实现K近邻算法
  15. 微信小程序服装商城+后台管理系统
  16. d06调试详细说明_D06调试软件说明
  17. PPT转换PDF后转成图片
  18. 新浪低调上线开源镜像站
  19. 数据分析精选案例:3行代码上榜Kaggle学生评估赛
  20. 易天光模块的兼容性验证及交换机型号介绍

热门文章

  1. 全新的Unity移动游戏优化解决方案
  2. java除零异常_为什么Java除以0.0时不会抛出异常?
  3. utils:常见的几种日期格式和转换方法
  4. Nginx 配置 SSL 证书 + HTTPS 站点小记
  5. EAGLE PCB设计软件介绍
  6. Python 可迭代对象与迭代器的对比
  7. Origin 恢复默认作图模板
  8. 数据结构考试知识点总结——绪论
  9. 仿猫眼php,微信小程序 仿猫眼实现实例代码
  10. Apple Push Notification Service(苹果推送服务)