BlockingQueue接口使用场景

相信大家对生产者-消费者模式不陌生,这个经典的多线程协作模式,最简单的描述就是生产者线程往内存缓冲区中提交任务,消费者线程从内存缓冲区里获取任务执行。在生产者-消费者模式中最重要的就是这个内存缓冲区,可能你会疑问,为什么不让生产者直接把任务提交给消费者来执行,而是要通过一个中间媒介,也就是一个缓冲区来交换任务?

  1. 通过缓冲区,可以缓解生产者和消费者之间的速度差。假设生产者的速度大于消费者,生产者不断向缓冲区内提交任务,但是缓冲区大小有限,当内存缓冲区满时,生产者不得不被阻塞,此时消费者仍不断从缓冲区内获取任务执行,直到缓冲区不为空,生产者才能继续执行。
  2. 通过缓冲区,生产者不需要知道消费者是谁,生产者只需把任务提交到缓冲区即可;同样消费者也不需要直到生产者是谁,获取任务通过缓冲区。这样做的好处在于,对于代码的维护和升级,如果我们要改动消费者,我们不需要修改生产者和缓冲区。生产者和消费者之间的通信通过缓冲区。

在生产者-消费者模式中,充当这个缓冲区使用的是BlockingQueue接口,BlockingQueue继承自Queue接口,在实例化时,可以使用ArrayBlockingQueue和LinkedBlockingQueue两种队列,前者是基于数组实现的,而后者是基于链表实现,从名字我们就可以看出。看到这两个队列大家应该有点印象,在线程池中也有这么一个参数BlockingQueue:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
/*              *corePoolSize:线程池中的线程数量.*maximumPoolSize:线程池中的最大线程数.*keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间.*unit:keepAliveTime的单位.*workQueue:任务队列,被提交进线程池,但没被执行的任务.*threadFactory:线程工厂,用于创建线程,可自定义线程.*handler:拒绝服务,当线程池中没有空闲线程为新任务服务时,且等待队列中也已经满时,执行的策略.*/

线程池中的workQueue任务等待队列用来保存被提交进线程池,但因为没有空闲线程,所以尚未被执行的任务。使用ArrayBlockingQueue做为有界队列,LinkedBlockingQueue做为无界队列,无界队列因为基于链表实现,所以不会出现任务入队列失败的情况,直到内存耗尽为止。

为什么使用BlockingQueue做为内存缓冲区

用回生产者-消费者模式举例说,在多线程环境下,当生产者线程向内存缓冲区提交了一个任务后,消费者线程怎么知道此时内存缓冲区内有新的任务提交?如果我们让消费者线程不断查询缓冲区内的任务提交情况,是可以,不过这样不是一个效率高的方法。

在线程池中也是,使用BlockingQueue队列,关键是Blocking,假设我们使用的是ArrayBlockingQueue,基于数组实现的有界队列,生产者线程不断向任务队列(也就是缓冲区)内提交任务时,当任务队列已经放满待执行任务后,生产者线程就会被阻塞,直到缓冲区内有空闲位置后,才会唤醒生产者线程。当消费者线程不断从缓冲区内获取任务执行时,假设所有任务都被获取后,消费者线程也会被阻塞,直到缓冲区内有新的任务被提交,消费者线程被重新唤醒。这是怎么做到的?使用BlockingQueue队列的线程是怎么如何在队列满时,让提交任务线程阻塞,而在队列为空,如何让获取任务线程阻塞?来看看BlockingQueue的内部实现。

BlockingQueue内部实现

为了实现上面所说的情况,用生产者-消费者模式为例,即:

  1. 当缓冲队列满时,生产者线程被阻塞,无法继续向缓冲区内提交任务;消费者线程正常执行,如果消费者线程被阻塞,则将其唤醒。
  2. 当缓冲队列为空时,消费者线程被阻塞,无法继续从缓冲区中获取任务;生产者线程正常执行,如果生产者线程被阻塞,则将其唤醒。

BlockingQueue队列中,维护着两个Condition字段,一个为notEmpty,一个为notFull,和一把重入锁lock:

ArrayBlockingQueue内部实现:

final Object[] items;     
private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) {if(capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair); notEmpty = lock.newCondition();notFull =  lock.newCondition();
}
在ArrayBlockingQueue队列中,使用数组实现存储,所以实例化一个Object对象数组存放元素,AtomicInteger类型的count变量是使用了无锁CAS操作的线程安全类,用来保存当前队列中的元素个数。

LinkedBlockingQueue内部实现:

private final int capacity; //链表的容量
private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
transient Node<E> head; //链表头节点
private transient Node<E> last; //链表尾节点
private final ReentrantLock takeLock = new ReentrantLock(); //出队列锁
private final ReentrantLock putLock = new ReentrantLock(); //入队列锁
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}

看到源码,大家应该猜到了,就是使用Condition条件配合重入锁,让线程在某一时刻等待。而且使用加锁操作,表明BlockingQueue队列也是线程安全的。对于队列来说,两个最基本的操作:入队和出队,用ArrayBlockingQueue源码来看,ArrayBlockingQueue有offer()和put()两个方法实现往队列中插入元素,两者不同之处在于,使用offer()方法,如果此时队列中已满,那么offer()方法会插入失败,并立刻返回false;如果使用put()方法,当队列满时,使用put()方法的线程会等待,直到队列中有空闲的位置后,继续执行如对操作,这是如何做到的?来看看put()方法的实现:

public void put(E e) throws InterruptedException {checkNotNull(e); //检查入队元素是否为空final ReentrantLock lock = this.lock; //抓住当前BlockingQueue实例的lock重入锁lock.lockInterruptibly(); //加锁,可以响应中断try {while(count == item.length) {notFull.await(); //在notFull的Condition对象上等待}insert(e); //队列入队操作} finally {lock.unlock;}
}

入队操作中,首先获得该队列的锁,然后特殊情况判断,while死循环不断判断,如果count == item.length,也就是当前队列已经满了,那么就让线程在notFull上等待,表示当前队列满,这就做到了,当内存缓冲区满时,生产者线程等待。当队列中有空闲位置了,则跳出跳出while循环,执行insert()插入操作。

private void insert(E x) {items[putIndex] = x;putIndex = inc(putIndex);++count;notEmpty.signal();
}
在执行插入操作的实现中,会把等待在Condition实例notEmpty的线程唤醒,等于是告诉正在等待的消费者线程,当前有新任务进入缓冲区了。
 上面是入队操作,接着看出队,和入队相似,在队列中出队一个元素也有两个方法,poll()和take(),使用poll()方法出队,如果队列为空,则返回null。take()方法则会等待在这个队列上。与put()方法对比,可以直到,当队列为空时,调用take()方法的线程会等待在notEmpty上,实际上就是这样的,来看看take()方法的实现:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while(count == 0) {notEmpty.await();}return extract();} finally {lock.unlock();}
}
假设当前队列为空,也就是count == 0;那么就让当前线程在notEmpty上等待,直到有新的任务提交进队列,就执行入队操作extract()。
private E extract() {final Object[] items = this.items;E x = this.<E>cast(items[takeInddex]);items[takeIndex] = null;takeIndex = inc(takeIndex);--count;notFull.signal();return x;
}
同理往队列中入队一个元素后,会让等待在notFull上的线程唤醒,意思是告诉它们,当前队列不空了,你们可以提交新的任务进来了。
来具体看个例子,在生产者-消费者模式中怎么用这个BlockingQueue队列:
package producerconsumer;public final class work {private final int data;//构造函数初始化public work(int data) {this.data = data;}public work(String s) {this.data = Integer.valueOf(s);}public int getData() {return this.data;}@Overridepublic String toString() {return "data = "+this.data;}
}
自定义一个work类,模拟生产者和消费者处理的任务,里面就一个int型的data变量。
package producerconsumer;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;public class ProducerDemo implements Runnable {private BlockingQueue<work> workQueue; //内存缓冲区队列private static AtomicInteger count = new AtomicInteger(); //队列中的总任务数private volatile boolean isRunning = true; //标识当前线程的状态//构造函数初始化public ProducerDemo(BlockingQueue<work> workQueue) {this.workQueue = workQueue;}public void stopProducer() {this.isRunning = false;}@Overridepublic void run() {work newWork = new work(count.incrementAndGet());Random r = new Random();System.out.println("生产者线程: "+Thread.currentThread().getId()+"开始执行.");try {while(isRunning) {if(!workQueue.offer(newWork)) {System.out.println("生产者线程: "+Thread.currentThread().getId()+": 缓冲区满,任务-"+newWork+"放入失败.");} else {System.out.println("生产者线程: "+Thread.currentThread().getId()+"将任务-"+newWork+"放入缓冲区.");}Thread.sleep(r.nextInt(1000));}} catch(InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt(); }}}
简单地模拟,在生产者线程的构造函数中获得与消费者通信的缓冲区,然后往里面添加用随机数标记的任务。
package producerconsumer;import java.util.Random;
import java.util.concurrent.BlockingQueue;public class ConsumerDemo implements Runnable {private BlockingQueue<work> workQueue; //内存缓冲区队列private volatile boolean isRunning = true; //标识当前线程的状态//构造函数初始化public ConsumerDemo(BlockingQueue<work> workQueue) {this.workQueue = workQueue;}public void stopConsumer() {this.isRunning = false;}@Overridepublic void run() {work takeWork;Random r = new Random();System.out.println("消费者线程: "+Thread.currentThread().getId()+"开始执行.");while(isRunning) {try {takeWork = workQueue.take(); //从缓冲区中获取任务if(takeWork != null) {System.out.println("消费者线程: "+Thread.currentThread().getId()+"获取任务:"+takeWork.getData());} else {System.out.println("缓冲区空.");}Thread.sleep(r.nextInt(1000));} catch(InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt(); }}}}
消费者线程同样构造函数中获得与生产者通信的队列后,调用take()方法获取缓冲区里面的任务,并把任务id打印出来。
package producerconsumer;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MainDemo {public static void main(String[] args) throws InterruptedException {//创建缓冲区BlockingQueue<work> workQueue = new ArrayBlockingQueue<work>(10); //基于数组实现//创建线程池ExecutorService pcThreadPool = Executors.newCachedThreadPool(); //根据实际情况调整线程数量的线程池//创建生产者线程ProducerDemo producer1 = new ProducerDemo(workQueue);ProducerDemo producer2 = new ProducerDemo(workQueue);//创建消费者线程ConsumerDemo consumer1 = new ConsumerDemo(workQueue);ConsumerDemo consumer2 = new ConsumerDemo(workQueue);//将线程提交到线程池pcThreadPool.execute(producer1);pcThreadPool.execute(producer2);pcThreadPool.execute(consumer1);pcThreadPool.execute(consumer2);Thread.sleep(3*1000);producer1.stopProducer();producer2.stopProducer();consumer1.stopConsumer();consumer2.stopConsumer();Thread.sleep(5*1000);pcThreadPool.shutdown(); //关闭线程池}}
主函数中,创建一个ArrayBlockingQueue队列,也就是基于数组实现的BlockingQueue,然后实例化两个生产者线程和消费者线程,并将它们提交到线程池中执行,线程池使用的是newCachedThreadPool(),是一个可根据实际情况调整线程池内线程数量的线程池。
运行结果:
完整实现代码已上传GitHub:
https://github.com/justinzengtm/Java-Multithreading/tree/master/BasicProducerConsumer
https://gitee.com/justinzeng/multithreading/tree/master/BasicProducerConsumer

生产者-消费者中的缓冲区:BlockingQueue接口相关推荐

  1. 生产者消费者ReentrantLock实现以及BlockingQueue实现

    1.ReentrantLock实现 /*** 描述: 一个初始值为0的变量,两个线程对其交替操作,一个加一个减** @author xinjiao.yu@marketin.cn* @create 20 ...

  2. 生产者消费者的实现与思考

    生产者消费者的Java实现 生产者:负责生产消息,在缓冲区满后休眠: 消费者:负责消费消息,在缓冲区空后休眠: 两者的休眠何时唤醒? 1.生产者休眠,是因为缓冲区满,所以只要消费者进行了消费,那么缓冲 ...

  3. Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例

    Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例 本文由 TonySpark 翻译自 Javarevisited.转载请参见文章末尾的要求. Java.util.concurr ...

  4. 【Linux入门】多线程(线程概念、生产者消费者模型、消息队列、线程池)万字解说

    目录 1️⃣线程概念 什么是线程 线程的优点 线程的缺点 线程异常 线程异常 Linux进程VS线程 2️⃣线程控制 创建线程 获取线程的id 线程终止 等待线程 线程分离 3️⃣线程互斥 进程线程间 ...

  5. Java并发编程(二十三)------并发设计模式之生产者消费者模式

    参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...

  6. 【1】生产者-消费者模型的三种实现方式

    (手写生产者消费者模型,写BlockingQueue较简便 ) 1.背景                                                                 ...

  7. 管程法解决生产者消费者问题

    生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深.所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是 ...

  8. 【生产者消费者模型】

    Linux生产者消费者模型 生产者消费者模型 生产者消费者模型的概念 生产者消费者模型的特点 生产者消费者模型优点 基于BlockingQueue的生产者消费者模型 基于阻塞队列的生产者消费者模型 模 ...

  9. 【Linux篇】第十六篇——生产者消费者模型

    生产者消费者模型 生产者消费者模型的概念 生产者消费者模型的特点 生产者消费者模型优点 基于BlockingQueue的生产消费者模型 基于阻塞队列的生产者消费者模型 模拟实现基于阻塞队列的生产消费模 ...

最新文章

  1. iOS 可能用到的三方框架
  2. ICPR 2020国际学术竞赛:大规模无噪声精细商品图像识别
  3. 使用canal同步MySQL数据到Elasticsearch(ES)
  4. MyBatis整合Spring的实现(2)
  5. 【DKN】(六)KCNN.py
  6. QT保留小数点后位数
  7. 人才第一!英伟达大幅扩大深度学习学院(DLI)规模
  8. DSP sawtooth锯齿波与square方波matlab产生(M2.2)
  9. 即将涨价 | 带学《机器学习》西瓜书+带打天池和达观杯AI大赛
  10. 来字节一年多,我都经历了什么?
  11. Linux内核编程(1)
  12. c语言编译题a b,C语言考试试题A卷.doc
  13. android密度计算器,密度计算器
  14. 武汉星起航跨境电商——亚马逊日本站JCT政策将实现改革
  15. ipad iphone开发_如何在iPhone或iPad上删除电子邮件
  16. ps图层锁定后如何解锁
  17. 如何获取vs code中插件Waka Time的API key
  18. 删除计算机自带的游戏软件,win10系统删除自带游戏的处理办法
  19. 艾森哲面试 Accenture
  20. python计算平均值标准差和中位数_如何使用python求平均数、方差、中位数

热门文章

  1. 港科喜讯 | 香港科大两位教授荣获立陶宛中央银行颁发的最佳经济学论文奖
  2. kafka broker 进入 conflicted ephemeral node 死循环
  3. 学习笔记:在Ubuntu16.04系统内安装Petalinux软件(包括如何安装Ubuntu16.04和相关支持库 详解)
  4. 大广角USB摄像头选用指南
  5. 似然函数与贝叶斯公式
  6. [旭日x3] 动手实践之bpu_rezie以及简化cpp编译流程
  7. java毕业设计车牌信息管理系统Mybatis+系统+数据库+调试部署
  8. mysql 查询数据库中所有表的信息
  9. QtMath:通用数学函数
  10. css3——3D动画、transform-style:preserve-3d、transform:perspective()、perspective-origin