阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
1、初识阻塞队列

在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

BlockingQueue的核心方法:

public interface BlockingQueue<E> extends Queue<E> {//将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。boolean add(E e);//将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。boolean offer(E e);//将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。void put(E e) throws InterruptedException;//将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;//从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。E take() throws InterruptedException;//在给定的时间里,从队列中获取值,如果没有取到会抛出异常。E poll(long timeout, TimeUnit unit)throws InterruptedException;//获取队列中剩余的空间。int remainingCapacity();//从队列中移除指定的值。boolean remove(Object o);//判断队列中是否拥有该值。public boolean contains(Object o);//将队列中值,全部移除,并发设置到给定的集合中。int drainTo(Collection<? super E> c);//指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。int drainTo(Collection<? super E> c, int maxElements);
}

在深入之前先了解下下ReentrantLock 和 Condition:
重入锁ReentrantLock:
ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
主要方法:

  • lock()获得锁
  • lockInterruptibly()获得锁,但优先响应中断
  • tryLock()尝试获得锁,成功返回true,否则false,该方法不等待,立即返回
  • tryLock(long time,TimeUnit unit)在给定时间内尝试获得锁
  • unlock()释放锁

Condition:await()、signal()方法分别对应之前的Object的wait()和notify()

  • 和重入锁一起使用
  • await()是当前线程等待同时释放锁
  • awaitUninterruptibly()不会在等待过程中响应中断
  • signal()用于唤醒一个在等待的线程,还有对应的singalAll()方法
2、阻塞队列的成员
队列 有界性 数据结构
ArrayBlockingQueue bounded(有界) 加锁 arrayList
LinkedBlockingQueue optionally-bounded 加锁 linkedList
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
SynchronousQueue bounded 加锁
LinkedTransferQueue unbounded 加锁 heap
LinkedBlockingDeque unbounded 无锁 heap
  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

  • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。DelayQueue可以运用在以下应用场景:
    • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

3、阻塞队列原理以及使用

ArrayBlockingQueue

// 存储队列元素的数组final Object[] items;// 拿数据的索引,用于take,poll,peek,remove方法int takeIndex;// 放数据的索引,用于put,offer,add方法int putIndex;// 元素个数int count;// 可重入锁final ReentrantLock lock;// notEmpty条件对象,由lock创建private final Condition notEmpty;// notFull条件对象,由lock创建private final Condition notFull;public ArrayBlockingQueue(int capacity) {this(capacity, false);//默认构造非公平锁的阻塞队列 }public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];//初始化ReentrantLock重入锁,出队入队拥有这同一个锁 lock = new ReentrantLock(fair);//初始化非空等待队列notEmpty = lock.newCondition();//初始化非满等待队列 notFull =  lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;//将集合添加进数组构成的队列中 try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}

添加的实现原理:

这里的add方法和offer方法最终调用的是enqueue(E x)方法,其方法内部通过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,下次我们就需要从数组头部开始添加了,如下图演示

    //入队操作private void enqueue(E x) {final Object[] items = this.items;//通过putIndex索引对数组进行赋值items[putIndex] = x;//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}

put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。到此我们对三个添加方法即put,offer,add都分析完毕,其中offer,add在正常情况下都是无阻塞的添加,而put方法是阻塞添加。这就是阻塞队列的添加过程。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示

    public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//当队列元素个数与数组长度相等时,无法添加元素while (count == items.length)//将当前调用线程挂起,添加到notFull条件队列中等待唤醒notFull.await();enqueue(e);} finally {lock.unlock();}}

take方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程,执行take操作。图示如下

    //从队列头部删除,队列没有元素就阻塞,可中断public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();//中断try {//如果队列没有元素while (count == 0)//执行阻塞操作notEmpty.await();return dequeue();//如果队列有元素执行删除操作} finally {lock.unlock();}}

Java 阻塞队列原理相关推荐

  1. java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...

    线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...

  2. Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    转载自  Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析 Java中的阻塞队列接口BlockingQueue继承自Queue接口. Block ...

  3. Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例

    Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例 本文由 TonySpark 翻译自 Javarevisited.转载请参见文章末尾的要求. Java.util.concurr ...

  4. Java阻塞队列 LinkedBlockingDeque

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/120833494 本文出自[赵彦军的博客] Java队列 Queue Java队列 ...

  5. 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...

  6. Java 阻塞队列--BlockingQueue

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...

  7. Java阻塞队列-BlockingQueue介绍及实现原理

    阻塞队列是对普通队列的一种扩展,在普通队列功能上增加了一些额外功能. 普通队列的功能可以参照java的Queue接口 public interface Queue<E> extends C ...

  8. JAVA阻塞队列和线程池原理

    阻塞队列 队列 队列是一种特殊的线性表,遵循先入先出.后入后出的基本原则,一般来说,它只允许在表的前端进行删除操作,而在表的后端进行插入操作. 什么是阻塞队列 支持阻塞的插入方法,当队列满时,队列会阻 ...

  9. Java阻塞队列的实现

    转自: http://segmentfault.com/a/1190000000373535 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里 ...

最新文章

  1. 06-老马jQuery教程-jQuery高级
  2. python 廖雪峰_廖雪峰的Python系列教程(20)——高级特性之生成器
  3. Redis 是如何执行的?
  4. 一个简洁的个人导航页面源码
  5. Mac OSX操作系统安装和配置Zend Server 6教程(1)
  6. python程序封装成exe_如何将python脚本封装成exe程序?
  7. 【ACL2021】主会571篇长文分类最全汇总
  8. 快速搭建Python开发环境
  9. Android事件处理
  10. [2018.04.23 T3] 最大值
  11. pythonrequests发送数据_使用Python爬虫库requests发送表单数据和JSON数据
  12. 赤菟CH32V307 RISC-V 开发板 rt-thread 的坑 - esp8266的使用
  13. 如何制作内网web服务器,内网搭建WEB服务器教程
  14. 解决layui的富文本编辑器中图片的大小问题
  15. Sentinel系统自适应限流【原理源码】
  16. Cura切片3d打印设置
  17. 盘点2017 CES展会所有亮眼黑科技 (上)
  18. 开源全文搜索(搜索引擎)
  19. IC基础知识(十四)Flip-Flop和Latch的区别
  20. java/php/net/pythont羽毛球场地管理系统设计

热门文章

  1. InfluxDB删除数据
  2. cad尺寸标注快捷键_CAD缩放对象后如何使标注对象尺寸不变
  3. asp发送微信公众号客服消息源码
  4. docxtpl - 向表格里插入图片
  5. word里如何插入取整符号?
  6. Axles UVALive 7197
  7. html点击修改名称,修改昵称.html
  8. JDK版本优化JDK1.2-12
  9. 详解RS232/UART/协议/串口通信
  10. 关于JAVA对接支付宝开发文档错误总结