队列阻塞_Java并发|阻塞队列ArrayBlockingQueue解析
之前的文章我们学了 ConcurrentHashMap、 ConcurrentLinkedQueue 等线程安全容器,而且也说了 Java并发包中的 Concurent 开头的并发容器都是非阻塞的,是使用 CAS 自旋操作实现的线程安全。今天我们来学习实现线程安全的另一种方法:就是「阻塞」形式,即使用锁,这样的容器也被称为阻塞队列。
什么是阻塞队列
阻塞队列支持阻塞的插入和移除。
- 支持阻塞的插入:就是当队列满了的情况下,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除:就是当队列为空的情况下,队列会阻塞获取元素的线程,直到队列非空。
使用阻塞队列,我们要知道,当阻塞队列不可用时,插入和移除操作的4种处理方式,这也是我们使用的前提,你就会知道在开发中到底使用哪个方法是符合你的预期的。
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常
返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null。
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者 线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队 列会阻塞住消费者线程,直到队列不为空。
超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。
「需要注意的是:」
BlockingQueue 不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表失败,还是获取的值就是 null 值。
ArrayBlockingQueue 概述
❝
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原 则对元素进行排序。
❞
ArrayBlockingQueue
默认情况下是不支持线程公平的访问队列,这里的公平性指的是阻塞的线程可以按照先后顺序访问队列,即先阻塞的线程先访问。如果要保证线程公平访问,通常会降低吞吐量,当然 ArrayBlockingQueue
也是支持公平访问的,使用方式如下:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
这里小伙伴可以思考下,公平锁是如何实现的?
之前我们的文章也讲过,可以使用 可重入锁(对于可重入锁不太了解的伙伴可以看我这个系列专栏前面的文章),具体代码如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}
ArrayBlockingQueue 源码解析
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,如何让生产者和消费者进行高效率的通信呢?让我们先来看看JDK是如何实现的。
入队操作
「使用通知模式实现」。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。
通过查看JDK源码发现 ArrayBlockingQueue
使用了Condition来实现,代码如下:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 使用condition,必须要获取锁 lock.lockInterruptibly(); try { while (count == items.length) // 队列满则在notFull对象上等待通知 notFull.await(); // 将指定元素放在队尾,并通知 notEmpty.signal(); enqueue(e); } finally { lock.unlock(); }}
上面的代码不难发现,当 items 这个数组满了的时候,进入 while 死循环 在 notFull 这个 Condition 对象上等待,同时会释放掉 可重入锁。 看到这里我们会看到如果队列不满,则直接将当前元素插入到队尾,并且要通知因为队列为空而等待在 notEmpty 这个 condition 对象上的线程,具体如何通知呢?我们顺着源码继续往下看。
// 因为阻塞队列是线程安全的,只会获取到lock锁才能进入入队操作private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; // 队列的元素数量,插入后要增加1 count++; // 因为插入了元素,所以要通知那些因为队列是空而等待的线程 notEmpty.signal();}
代码很简单就不做啰嗦了,其实就是入队时调用队列为空的 Condition 的 signal 方法。
出队操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列为空在notEmpty条件进行阻塞 while (count == 0) notEmpty.await(); // 出队,并且调用 notFull.signal(); 通知入队线程 return dequeue(); } finally { lock.unlock(); }}
如果你看懂了入队操作的代码,那么出队这块其实是类似的,队列为空就阻塞,非空则取出首元素。
当然啦 dequeue()
方法 在出队之后也会通知所有因为队列满而阻塞的入队线程,我们来看下代码:
// 出队操作,必须先获取到锁才能调用private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 出队一个元素,将这个位置置为null,即为删除 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // 队列大小减一 count--; if (itrs != null) itrs.elementDequeued(); // 通知所有因为队列满阻塞的线程 notFull.signal(); return x;}
看到这里,那恭喜你你已经对阻塞队列的实现有了一个清晰的认识:它们是利用可重入锁获取 两个 Condition 对象来分别阻塞入队和出队操作的。
你可能会问,阻塞队列使用了可重入锁,它是怎么来阻塞当前线程的?
从上面的代码,我们看到了当队列满调用了 notFull.await() 方法,这个方法是如何实现阻塞的呢?代码如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加一个新的节点线程到等待队列 Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 阻塞主要通过 LockSupport.park(this)来实现 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
将当前节点加入等待条件队列,通过调用 LockSupport.park(this) 方法来阻塞当前线程;
public static void park(Object blocker) { Thread t = Thread.currentThread(); // 先保存一下将要阻塞的线程 setBlocker(t, blocker); // unsafe.park阻塞 当前线程,这是个native方法 UNSAFE.park(false, 0L); setBlocker(t, null);}
看到 UNSAFE.park() 方法,这是一个 native 方法, 被 native修饰则表示是被 JVM 实现的,而 JVM 是在不同的操作系统中实现是不一样的,具体就是通过 C 来实现的, 这块就不再深入了,因为我们是搞 Java 的 知道这里已经很不错了,当然如果你很感兴趣可以自行在研究哈。
当调用了 UNSAFE.park() 方法就会阻塞当前线程,只有出现下面四种情况才会被唤醒从这个方法返回:
- 被中断时,这个方法是可以响应中断的;
- 等待一定的时间,超时返回,这个时间是 park() 方法的一个参数;
- 调用了 unpark() 方法;
- 异常现象的发生,原因不明,也就是意外情况;
动手实践
既然我们已经学习了 ArrayBlockingQueue ,知道了它是使用数组实现的,它是有界的,当队列满时会阻塞生产者线程,那么我们一起验证下:
public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2); arrayBlockingQueue.offer(1); arrayBlockingQueue.put("2"); System.out.println("满了"); arrayBlockingQueue.put(3); for (Object i : arrayBlockingQueue) { System.out.println(i); }}
运行上述代码,可以看到队列大小为2,当我们插入第3个元素时,就会阻塞当前线程,当前线程被阻塞后也就进入到了 WATING 状态,建议大家也动手验证下,实践出真知。
使用 jps 查看当前运行程序的进程号;
然后使用
jstack -l pid
查询线程堆栈信息;
原创真心不易,希望你能帮我个小忙呗,如果本文内容你觉得有所收获,请帮忙点个“在看”,或者转发分享让更多的小伙伴看到。
● 观察者模式解析
● Java并发|Atomic包下的原子操作类
● Java并发|Semaphore如何实现限流器
- 觉得写得还不错的小伙伴麻烦动手「三连」;
- 文章如果存在不正确的地方,麻烦指出,非常感谢您的阅读;
- 推荐大家关注我的公众号,会为你定期推送原创干货文章,拉你进优质学习社群;
- github地址:https://github.com/coderluojust/qige_blogs
队列阻塞_Java并发|阻塞队列ArrayBlockingQueue解析相关推荐
- java队列加锁_java并发-----浅析ReentrantLock加锁,解锁过程,公平锁非公平锁,AQS入门,CLH同步队列...
前言 为什么需要去了解AQS,AQS,AbstractQueuedSynchronizer,即队列同步器.它是构建锁或者其他同步组件的基础框架(如ReentrantLock.ReentrantRead ...
- java线程优先级队列等待_java线程池队列优先级(插队)Demo
在做线程池操作的时候,突然来个加紧处理时,会很纠结,不知道怎么处理让加紧的线程插队先执行.该Demo使用了自定义线程池,采用优先级阻塞式队列(PriorityBlockingQueue)的方式来处理插 ...
- java 无锁队列实现_java无锁队列实现
对于像应用中多个生产者需要并发发送一些日志信息给远程存储服务器,这些日志信息用于dubbo的调用链分析. 一种方案是生产者线程将要发送的日志消息存储到队列当中,然后由另一个本地消费线程从队列中获取要发 ...
- 34 进程 pid ppid 并发与并行,阻塞与非阻塞 join函数 process对象 孤儿进程与僵尸进程...
进程与程序 一个正在被运行的程序就称之为进程,是程序具体执行过程,一种抽象概念 进程来自于操作系统 多进程:多个正在运行的程序. 测试: import timewhile True: time.sle ...
- java 并发队列_JAVA并发编程:阻塞队列BlockingQueue之SynchronousQueue
前面在讲解Executors工厂创建可缓存线程的线程池(newCachedThreadPool)的时候有提到过SynchronousQueue队列,该线程池使用 SynchronousQueue 作为 ...
- 并发-阻塞队列源码分析
阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...
- 多线程编程:阻塞、并发队列的使用总结
最近,一直在跟设计的任务调度模块周旋,目前终于完成了第一阶段的调试.今天,我想借助博客园平台把最近在设计过程中,使用队列和集合的一些基础知识给大家总结一下,方便大家以后直接copy.本文都是一些没有技 ...
- Java多线程学习二十五:阻塞和非阻塞队列的并发安全原理||如何选择适合自己的阻塞队列?
阻塞和非阻塞队列的并发安全原理. 之前我们探究了常见的阻塞队列的特点,以 ArrayBlockingQueue 为例, 首先分析 BlockingQueue 即阻塞队列的线程安全原理,然后再看看它的兄 ...
- 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究
并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究 http://www.importnew.com/25668.html 一. 前言 常用的并发队列有阻塞队列和非阻塞队列 ...
最新文章
- 机器学习中的欠拟合与过拟合
- php5对象复制、clone、浅复制与深复制的区别与介绍
- matlab decomposition filters,MATLAB小波去噪求助(附算法和显示图片)!不知自己哪个地方出了问题,求指点! - 信息科学 - 小木虫 - 学术 科研 互动社区...
- mysql慢查询工具
- 前端技术的概括以及html的基本知识总结
- apache 启用 gzip压缩
- python爬火车票是不是违法_python利用selenium+requests+beautifulsoup爬取12306火车票信息...
- 修补工具为什么修不干净_超声波洗不干净牙齿吗?为什么还要喷砂?
- 解读mpvue官方文档的Class 与 Style 绑定及不支持语法
- 如何导出立创EDA库到AD库 ?
- 修改vscode图标
- 基于微博热搜生成词云图
- 大地经纬度坐标与地心地固坐标的的转换
- 两篇毕业论文致谢同一个女朋友?哈哈哈哈!
- 【趣文】秦始皇与区块链竟然有关系
- 基于STM32震动感应灯
- 股票融资全面解决方案
- Ebox还没到,可怎么办呢
- 博览群书:谷歌软件测试之道
- typescript 如何反推数组的类型
热门文章
- 放弃腾讯75W年薪,回老家当公务员,提离职被领导教育,网友:leader嫉妒了
- CNN_TensorFlow图像分类代码
- 如何使用PHP中的字符串函数
- 本田与索尼宣布将成立合资企业 计划2025年开始销售电动汽车
- 消息称百度网盘青春版降速23倍:从52MB/s降至2.2MB/s
- B端出行,缺一个盒子汽车么?
- 理想汽车CEO李想晒11月理想ONE成绩
- 12GB+512GB售价18999元起,华为发布Mate X2典藏版
- 家乐福举报山姆涉嫌“二选一”背后 会员店需要的不是模仿能力
- 敢开“电动爹”回老家,你说我胆子大不大?