阻塞队列

概念

队列

队列就可以想成是一个数组,从一头进入,一头出去,排队买饭(FIFO),和栈是一样的FIFO

阻塞队列

BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞

    • 当蛋糕店的柜子空的时候,无法从柜子里面获取蛋糕
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

    • 当蛋糕店的柜子满的时候,无法继续向柜子里面添加蛋糕了

也就是说 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素

同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增

这也是MQ消息中间件的底层原理。

为什么要用?

去海底捞吃饭,大厅满了,需要进候厅等待,但是这些等待的客户能够对商家带来利润,因此我们非常欢迎他们阻塞

在多线程领域:所谓的阻塞,在某些清空下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒

为什么需要BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

架构

// 你用过List集合类// ArrayList集合类熟悉么?// 还用过 CopyOnWriteList  和 BlockingQueue

BlockingQueue阻塞队列是属于一个接口,底下有七个实现类

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列
    • 有界,但是界限非常大,相当于无界,可以当成无界**(谨慎使用)**
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
    • 生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

这里需要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue

BlockingQueue核心方法

抛出异常 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full 当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException
特殊性 插入方法,成功true,失败false 移除方法:成功返回出队列元素,队列没有就返回空
一直阻塞 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出, 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。
超时退出 当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出

抛出异常组

但执行add方法,向已经满的ArrayBlockingQueue中添加元素时候,会抛出异常

// 阻塞队列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));System.out.println(blockingQueue.add("XXX"));

运行后:

true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue fullat java.util.AbstractQueue.add(AbstractQueue.java:98)at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)

同时如果我们多取出元素的时候,也会抛出异常,我们假设只存储了3个值,但是取的时候,取了四次

// 阻塞队列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());

那么出现异常

true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementExceptionat java.util.AbstractQueue.remove(AbstractQueue.java:117)at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)

布尔类型组

我们使用 offer的方法,添加元素时候,如果阻塞队列满了后,会返回false,否者返回true

同时在取的时候,如果队列已空,那么会返回null

BlockingQueue blockingQueue = new ArrayBlockingQueue(3);System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());

运行结果

true
true
true
false
a
b
c
null

阻塞队列组

我们使用 put的方法,添加元素时候,如果阻塞队列满了后,添加消息的线程,会一直阻塞,直到队列元素减少,会被清空,才会唤醒

一般在消息中间件,比如RabbitMQ中会使用到,因为需要保证消息百分百不丢失,因此只有让它阻塞

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("================");blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();

同时使用take取消息的时候,如果内容不存在的时候,也会被阻塞

不见不散组

offer( ) , poll 加时间

使用offer插入的时候,需要指定时间,如果2秒还没有插入,那么就放弃插入

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));

同时取的时候也进行判断

System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));

如果2秒内取不出来,那么就返回null

SynchronousQueue

SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储的BlockingQueue,每一个put操作必须等待一个take操作,否者不能继续添加元素

下面我们测试SynchronousQueue添加元素的过程

首先我们创建了两个线程,一个线程用于生产,一个线程用于消费

生产的线程分别put了 A、B、C这三个字段

BlockingQueue<String> blockingQueue = new SynchronousQueue<>();new Thread(() -> {try {       System.out.println(Thread.currentThread().getName() + "\t put A ");blockingQueue.put("A");System.out.println(Thread.currentThread().getName() + "\t put B ");blockingQueue.put("B");        System.out.println(Thread.currentThread().getName() + "\t put C ");blockingQueue.put("C");        } catch (InterruptedException e) {e.printStackTrace();}
}, "t1").start();

消费线程使用take,消费阻塞队列中的内容,并且每次消费前,都等待5秒

        new Thread(() -> {try {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}blockingQueue.take();System.out.println(Thread.currentThread().getName() + "\t take A ");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}blockingQueue.take();System.out.println(Thread.currentThread().getName() + "\t take B ");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}blockingQueue.take();System.out.println(Thread.currentThread().getName() + "\t take C ");} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();

最后结果输出为:

t1    put A
t2   take A 5秒后...t1     put B
t2   take B 5秒后...t1     put C
t2   take C

我们从最后的运行结果可以看出,每次t1线程向队列中添加阻塞队列添加元素后,t1输入线程就会等待 t2消费线程,t2消费后,t2处于挂起状态,等待t1在存入,从而周而复始,形成 一存一取的状态

阻塞队列的用处

生产者消费者模式

一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮

关于多线程的操作,我们需要记住下面几句

  • 线程 操作 资源类
  • 判断 干活 通知
  • 防止虚假唤醒机制

我们下面实现一个简单的生产者消费者模式,首先有资源类ShareData

/*** 资源类*/
class ShareData {private int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void increment() throws Exception{// 同步代码块,加锁lock.lock();try {// 判断while(number != 0) {// 等待不能生产condition.await();}// 干活number++;System.out.println(Thread.currentThread().getName() + "\t " + number);// 通知 唤醒condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() throws Exception{// 同步代码块,加锁lock.lock();try {// 判断while(number == 0) {// 等待不能消费condition.await();}// 干活number--;System.out.println(Thread.currentThread().getName() + "\t " + number);// 通知 唤醒condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}

里面有一个number变量,同时提供了increment 和 decrement的方法,分别让number 加1和减1

但是我们在进行判断的时候,为了防止出现虚假唤醒机制,不能使用if来进行判断,而应该使用while

// 判断
while(number != 0) {// 等待不能生产condition.await();
}

不能使用 if判断

// 判断
if(number != 0) {// 等待不能生产condition.await();
}

完整代码

/*** 生产者消费者 传统版* 题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮*/
/*** 线程 操作 资源类* 判断 干活 通知* 防止虚假唤醒机制*//*** 资源类*/
class ShareData {private int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void increment() throws Exception{// 同步代码块,加锁lock.lock();try {// 判断while(number != 0) {// 等待不能生产condition.await();}// 干活number++;System.out.println(Thread.currentThread().getName() + "\t " + number);// 通知 唤醒condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() throws Exception{// 同步代码块,加锁lock.lock();try {// 判断while(number == 0) {// 等待不能消费condition.await();}// 干活number--;System.out.println(Thread.currentThread().getName() + "\t " + number);// 通知 唤醒condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}
public class ProdConsumerTraditionDemo {public static void main(String[] args) {// 高内聚,低耦合    内聚指的是,一个空调,自身带有调节温度高低的方法ShareData shareData = new ShareData();// t1线程,生产new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.increment();} catch (Exception e) {e.printStackTrace();}}}, "t1").start();// t2线程,消费new Thread(() -> {for (int i = 0; i < 5; i++) {try {shareData.decrement();} catch (Exception e) {e.printStackTrace();}}}, "t2").start();}
}

最后运行成功后,我们一个进行生产,一个进行消费

t1    1
t2   0
t1   1
t2   0
t1   1
t2   0
t1   1
t2   0
t1   1
t2   0

生成者和消费者3.0

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,则这会给我们的程序带来不小的时间复杂度

现在我们使用新版的阻塞队列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用

/*** 生产者消费者  阻塞队列版* 使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用**/class MyResource {// 默认开启,进行生产消费// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改private volatile boolean FLAG = true;// 使用原子包装类,而不用number++private AtomicInteger atomicInteger = new AtomicInteger();// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueueBlockingQueue<String> blockingQueue = null;// 而应该采用依赖注入里面的,构造注入方法传入public MyResource(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;// 查询出传入的class是什么System.out.println(blockingQueue.getClass().getName());}/*** 生产* @throws Exception*/public void myProd() throws Exception{String data = null;boolean retValue;// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒// 当FLAG为true的时候,开始生产while(FLAG) {data = atomicInteger.incrementAndGet() + "";// 2秒存入1个dataretValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);if(retValue) {System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data  + "成功" );} else {System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data  + "失败" );}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");}/*** 消费* @throws Exception*/public void myConsumer() throws Exception{String retValue;// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒// 当FLAG为true的时候,开始生产while(FLAG) {// 2秒存入1个dataretValue = blockingQueue.poll(2L, TimeUnit.SECONDS);if(retValue != null && retValue != "") {System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue  + "成功" );} else {FLAG = false;System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );// 退出消费队列return;}}}/*** 停止生产的判断*/public void stop() {this.FLAG = false;}}
public class ProdConsumerBlockingQueueDemo {public static void main(String[] args) {// 传入具体的实现类, ArrayBlockingQueueMyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");System.out.println("");System.out.println("");try {myResource.myProd();System.out.println("");System.out.println("");} catch (Exception e) {e.printStackTrace();}}, "prod").start();new Thread(() -> {System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");try {myResource.myConsumer();} catch (Exception e) {e.printStackTrace();}}, "consumer").start();// 5秒后,停止生产和消费try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("");System.out.println("");System.out.println("5秒中后,生产和消费线程停止,线程结束");myResource.stop();}
}

最后运行结果

java.util.concurrent.ArrayBlockingQueue
prod     生产线程启动consumer  消费线程启动
prod     插入队列:1成功
consumer     消费队列:1成功
prod     插入队列:2成功
consumer     消费队列:2成功
prod     插入队列:3成功
consumer     消费队列:3成功
prod     插入队列:4成功
consumer     消费队列:4成功
prod     插入队列:5成功
consumer     消费队列:5成功5秒中后,生产和消费线程停止,线程结束
prod     停止生产,表示FLAG=false,生产介绍

Java面试之阻塞队列相关推荐

  1. 转:Java 7 种阻塞队列详解

    转自: Java 7 种阻塞队列详解 - 云+社区 - 腾讯云队列(Queue)是一种经常使用的集合.Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表. ...

  2. 聊聊并发(七)——Java中的阻塞队列

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...

  3. java concurrenthashmap与阻塞队列

    https://blog.csdn.net/wozniakzhang/article/details/108106205 Java~并发容器ConcurrentHashMap.ConcurrentLi ...

  4. 让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例

    在上一篇文章中给大家介绍了牛批的AQS,大致讲解了JUC中同步的思路.本来还没想好这一篇应该写点什么,刚好上周某个同事的代码出现问题,排查后发现是使用阻塞队列不当导致的,所以本篇决定介绍下阻塞队列. ...

  5. Java并发包--阻塞队列(BlockingQueue)

    阻塞队列介绍 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质 ...

  6. java 手写阻塞队列_Java阻塞队列的实现

    阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列 ...

  7. Java多线程_阻塞队列

    1.什么是阻塞队列        我们知道,PriorityQueue.LinkedList这些都是非阻塞队列.在我们使用非阻塞队列的时候有一个很大问题,它不会对当前线程产生阻塞,那么在面对类似消费者 ...

  8. Java并发教程–阻塞队列

    如第3部分所述,Java 1.5中引入的线程池提供了核心支持,该支持很快成为许多Java开发人员的最爱. 在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列. 队列 首先,简 ...

  9. Java中的阻塞队列-LinkedBlockingQueue(二)

    原文地址:http://benjaminwhx.com/2018/05/11/%E3%80%90%E7%BB%86%E8%B0%88Java%E5%B9%B6%E5%8F%91%E3%80%91%E8 ...

最新文章

  1. Jackson学习笔记(三)转
  2. njust 1927 谁才是最强战舰!(anti-nim博弈论)
  3. 全国计算机等级考试题库二级C操作题100套(第31套)
  4. ElasticSearch 新增节点,横向扩容
  5. 我更看好rust飞鸽传书
  6. 【Java 多线程】互斥锁,自旋锁和读写锁
  7. 计算机科学与技术专业大学排名2020,2020计算机科学与技术专业最好大学排名:160余所大学上榜...
  8. mysql5.4升级5.6_Laravel5.4 升级到 5.6
  9. JSON的C代码示例
  10. c++编程求解二元二次方程组_二元一次方程组及其解法
  11. 数学建模算法与应用学习day4——综合评价与决策方法
  12. kali linux如何更新软件源
  13. win10虚拟服务器安装xp,xp mode for windows10虚拟机安装教程(详细)
  14. 【随笔】Java团长
  15. J2EE进阶之JSP和EL表达式 十二
  16. 24点计算机游戏规则,24点游戏规则和解题方法
  17. 如何将自己做的网页发布到网站让别人可以看到
  18. redis第三方软件medis
  19. 谈谈像素以及微信小程序的 rpx
  20. 【项目实战】---商品详情页的制作

热门文章

  1. 华为音量键只能调通话_华为新全面屏专利曝光,电源键、音量键都没有
  2. 动态类型语言和静态类型语言
  3. 版本控制工具--svn和git的使用(一) -----版本控制的好处以及分类
  4. SQL注入之Mysql报错注入
  5. jQuery 的属性操作方法
  6. Hibernate 一对一关联查询
  7. 【随机过程】马尔可夫链(2)
  8. 关于MySql的1146错误修正
  9. juc原子类之五:AtomicLongFieldUpdater原子类
  10. Microsoft.Jet.OLEDB.4.0和Microsoft.ACE.OLEDB.12.0的适用版本