在上一篇文章中给大家介绍了牛批的AQS,大致讲解了JUC中同步的思路。本来还没想好这一篇应该写点什么,刚好上周某个同事的代码出现问题,排查后发现是使用阻塞队列不当导致的,所以本篇决定介绍下阻塞队列。

真实案例分析

错误案例:

说来也是挺巧的,那天一位同事iMac换了Macbook Pro。然后像往常一样启动了各个服务,过了会电脑风扇疯狂工作发出响声,由于平常iMac上IDEA项目开的比较多占用较多内存时间长了也会卡顿,所以他并没有在意。但是之后一直是这样我们便觉得很奇怪,然后打开了他的活动监视器,发现某个Java进程竟然占用了百分之九十的CPU,然后确认是哪一个项目,最后通过jstack查看该项目中的线程情况,定位到了某个自定义线程,然后查看代码发现如下:

MyThreadPool.exportEnclosurePool.execute(() -> {while (true) {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (!blockingQueue.isEmpty()) {System.out.println("开始消费");EnclosureRequest one = null;try {one = blockingQueue.take();ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());} catch (Exception e) {e.printStackTrace();}}}
}复制代码

该同事的需求是做一个队列化附件导出的功能,因此他选择了生产者消费者模式,采用阻塞队列来实现;但是由于对此不太熟悉,所以写出了这段有问题的代码,导致死循环;万幸的是这段代码在测试分支上被我们发现了并没有上正式。正确的消费者代码实现如下:

正确实现:

MyThreadPool.exportEnclosurePool.execute(() -> {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (true) {try {EnclosureRequest one = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开始消费");ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());}
}
复制代码

阻塞队列简介

阻塞队列是一个插入和移除方法支持附加操作的队列;

  • 支持阻塞的插入方法:当阻塞队列满时,队列会阻塞插入元素的线程,直到队列不为满。
  • 支持阻塞的移除方法:当阻塞队列为空时,获取队列元素的线程会被阻塞直到队列不为空。

四种处理方式:

处理方式\方法 插入方法 移除方法
抛出异常 add(e) remove()
返回boolean值 offer(e) poll()
阻塞 put(e) take()
超时退出 offer(e,time,unit) poll(time,unit)

?小提示: 如果是无界阻塞队列,队列不可能出现满的情况,所以使用put()方法永远不会被阻塞,使用offer()方法永远返回true

Java中的阻塞列队介绍

  • ArrayBlockingQueue:基于数组的有界阻塞队列,支持配置公平性策略。
  • LinkedBlockingQueue:基于链表的无界(默认Integer.MAX_VALUE)阻塞队列,Executors中newFixedThreadPool()和newSingleThreadExecutor()使用的工作队列,所以不推荐使用Executors。
  • LinkedBlockingDeque:基于链表的无界(默认Integer.MAX_VALUE)双向阻塞队列
  • LinkedTransferQueue:基于链表的无界阻塞队列,该队列提供transfer(e)方法,如果有消费者正在等待则直接把元素给消费者,否者将元素放在队列的tail节点并阻塞到该元素被消费。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列,默认情况下采用自然顺序升序排序,也可以通过类重写compareTo()方法来指定元素排序规则,或者初始化队列时指定构造参数Comparator来排序。
  • DelayQueue:使用PriorityQueue实现的无界延时阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须阻塞到一个take操作发生,否则不能继续添加元素。支持配置公平性策略。

阻塞队列(LinkedBlockingQueue)实现原理分析

LinkedBlockingQueue是一个由成员变量Node组成的单链表结构,默认容量为Integer的最大值,其内部还有两把ReentrantLock锁putLock、takeLock用于保证插入和删除的线程安全(其他阻塞队列中使用一个ReentrantLock锁),两个Condition等待队列notEmpty、notFull用于存放take()和put()阻塞的线程。这里我简单分析下它两个比较重要的方法put()和take()。

源码分析

/*** 由Node节点组成单链表结构*/
static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }
}
/** 用于移除操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock();/** 阻塞于take的等待队列 */
private final Condition notEmpty = takeLock.newCondition();/** 用于插入操作的锁 */
private final ReentrantLock putLock = new ReentrantLock();/** 阻塞于put的等待队列 */
private final Condition notFull = putLock.newCondition();/*** 不指定容量默认是Integer的最大值*/
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}/*** 阻塞式插入元素(队列为满则阻塞)*/
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 获取插入锁(响应中断)putLock.lockInterruptibly();try {// 如果当前队列长度到达容量上限则当前线程释放锁加入不为满等待队列中while (count.get() == capacity) {notFull.await();}// 将元素加入队尾enqueue(node);// 当前队列长度加一(返回值是加一之前)c = count.getAndIncrement();// 如果加入后队列长度小于容量上限则通知不为满等待队列中的线程if (c + 1 < capacity)notFull.signal();} finally {// 释放锁putLock.unlock();}// 如果在插入元素之前队列为空则通知不为空等待队列中的线程if (c == 0)signalNotEmpty();
}
/*** 阻塞式移除元素(队列为空则阻塞)*/
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 获取移除锁(响应中断)takeLock.lockInterruptibly();try {// 如果当前队列为空则当前线程释放锁加入不为空等待队列while (count.get() == 0) {notEmpty.await();}// 移除队头元素x = dequeue();c = count.getAndDecrement();// 如果移除之后还有元素则通知不为空等待队列中的线程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果移除元素之前到达容量上线则通知不为满等待队列中的线程if (c == capacity)signalNotFull();return x;}复制代码

图解分析

需要注意的是put()操作将元素加入队列后释放锁是在判断容量是否小于上限通知notFull等待队列之后,通知notEmpty队列之前需要先获取takeLock,take()操作同理。

?小提示: LinkedBlockingQueue的put()和take()方法中和其他阻塞队列有个很大的区别。其他阻塞队列每次put()和take()都会去通知相应的等待队列,但是LinkedBlockingQueue只有在put前是空的去通知notEmpty,take前是满的去通知notFull等待队列,并且put后未满去通知notFull等待队列,take后未空去通知notEmpty等待队列。关于这点我个人的理解是由于LinkedBlockingQueue里分读写锁,如果每次take都通知notFull的话,需要另外去获取putLock产生竞争;用已经获取putLock的线程去唤醒notFull等待队列中线程减少了锁的竞争。其他阻塞队列中只有一把锁,所以通知不需要另外竞争锁。当然这只是我个人的看法而已,希望有了解的小伙伴指教。

总结

阻塞队列在并发中很重要,前面介绍的线程池中就用到了阻塞队列,生产者消费者模型也是可以用阻塞队列实现,到此已经介绍了AQS、阻塞队列、线程池,希望你们能关联起来理解加深印象。

往期文章:

  • 让人抓头的Java并发(三) 强大的AQS!
  • 让人抓头的Java并发(二) 线程池ThreadPoolExecutor分析
  • 让人抓头的Java并发(一) 轻松认识多线程

欢迎同样有感兴趣的小伙伴一起探讨

转载于:https://juejin.im/post/5d2801066fb9a07ed524cbab

让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例相关推荐

  1. Java并发教程–阻塞队列

    如第3部分所述,Java 1.5中引入的线程池提供了核心支持,该支持很快成为许多Java开发人员的最爱. 在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列. 队列 首先,简 ...

  2. 聊聊并发(七)——Java中的阻塞队列

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...

  3. java并发队列_Java并发教程–阻塞队列

    java并发队列 如第3部分所述,Java 1.5中引入的线程池提供了核心支持,该支持很快成为许多Java开发人员的最爱. 在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列 ...

  4. 转:Java 7 种阻塞队列详解

    转自: Java 7 种阻塞队列详解 - 云+社区 - 腾讯云队列(Queue)是一种经常使用的集合.Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表. ...

  5. Java多线程之线程并发库阻塞队列的应用

    ArrayBlockingQueue(jdk中已经提供 就在那个condition类说明里的可阻塞示例程序的下面就说明了) 注意三个添加方法的区别->查API文档 拿插入来说 一个会抛异常 一个 ...

  6. 看到一个NB的人: 千里冰封--JAVA 浓香四溢

    一个NB的Java程序员,做了一个NB的java音乐播放器Yoyoplayer,他居然还写过几首歌. 千里冰封--JAVA 浓香四溢 http://www.blogjava.net/hadeslee/ ...

  7. Java里的阻塞队列

    2019独角兽企业重金招聘Python工程师标准>>> 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元 ...

  8. java concurrenthashmap与阻塞队列

    https://blog.csdn.net/wozniakzhang/article/details/108106205 Java~并发容器ConcurrentHashMap.ConcurrentLi ...

  9. Java并发包--阻塞队列(BlockingQueue)

    阻塞队列介绍 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质 ...

最新文章

  1. 【论文知识点笔记】GNN概述(图神经网络概述)
  2. 数列分块入门 (1 ~ 7)
  3. oracle视图查询数据慢,8i查询DBA_FREE_SPACE视图极慢的问题
  4. mysql函数(五.流程控制函数)
  5. vue 根据字符串生成表单_vue 中怎么渲染字符串形式的组件标签?
  6. java 二维数组位置_java 找到二维数组指定元素的位置
  7. html table datasrc,table_data_tables.html
  8. 20200217:搜索旋转排序数组(leetcode33)
  9. [转载] Python: ord()函数
  10. Cost Function of Support Vector Machine
  11. er ubnt x设置教程_ubnt的er-x做交换机应该怎么设置啊?
  12. 开启并定制 Apache 显示目录索引样式
  13. java zone_offset_java 的 ZoneOffset 与 ZoneId
  14. 移动UI设计学习总结
  15. iphone12绿色好看 ,相比被全民吐槽的蓝色,绿色是怎么做到零差评的
  16. 如何对某一个文件夹下的所有文件快速重命名
  17. IKAnalyzer 配置文件介绍
  18. “微信之父”张小龙:我没去过龙泉寺!
  19. spicy之evt接口定义文件
  20. TI CC32XX SDA中SimpleLink Academy教程翻译(RTOS部分的基础介绍非常易懂)

热门文章

  1. Java SHAA加密
  2. 说说程序员、编译器、CPU之间的三角恋
  3. 【python】Python遍历dict的key最高效的方法是什么?
  4. Linux文件目录结构2
  5. 40款奇特的名片设计,吸引大家的眼球《上篇》
  6. 根据根据图片的url怎么取得图片ImageView对象
  7. Java JDK 学习笔记:File类
  8. 【正一专栏】欧冠决赛点评——只服齐达内,送别布冯
  9. (转载)hadoop2.2.0集群的HA高可靠的最简单配置
  10. 入侵检测系统基础知识