生产者-消费者模型之集合LinkedBlockingQueue源码解读
目录
- `LinkedBlockingQueue` 概述
- `LinkedBlockingQueue` 源码
- `LinkedBlockingQueue` 数据结构(单链表)
- `LinkedBlockingQueue` 主要属性
- `LinkedBlockingQueue` 构造函数
- 入队操作
- `put(E e)` 方法
- `offer(E e)` 方法
- `add(E e)` 方法
- 出队操作
- `take()` 方法
- `poll()` 方法
- 删除元素操作
- `remove()` 方法
- 获取元素操作
- `peek()` 方法
- `element()` 方法
- `LinkedBlockingQueue` 总结
LinkedBlockingQueue
概述
LinkedBlockingQueue
来自于jdk 1.5
的JUC
包,是一个线程安全的有界阻塞队列,底层数据结构是一个单链表- 作为有界队列,容量范围是
{1, Integer.MAX_VALUE}
,可以指定容量,如果未指定容量,则默认容量等于Integer.MAX_VALUE
,即最大容量 - 由于出队线程只操作队头,而入队线程只操作队尾,这里巧妙地采用了两把锁,对插入数据采用
putLock
,对移除数据采用takeLock
,即入队锁和出队锁,这样避免了出队线程和入队线程竞争同一把锁的现象 - 添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量
LinkedBlockingQueue
的工作模式都是非公平的,也不能手动指定为公平模式,这样的好处是可以提升并发量- 实现了
Serializable
接口,没有实现Cloneable
;不支持null
元素
LinkedBlockingQueue
源码
LinkedBlockingQueue
数据结构(单链表)
static class Node<E> {E item; // 数据域Node<E> next; // 后继引用Node(E x) { item = x; }
}
LinkedBlockingQueue
主要属性
// 阻塞队列的容量
private final int capacity;// 阻塞队列的元素个数,原子变量
private final AtomicInteger count = new AtomicInteger();// 阻塞队列的头结点,并不是真正的头结点
transient Node<E> head;// 阻塞队列的尾结点
private transient Node<E> last;// 消费线程使用的锁
private final ReentrantLock takeLock = new ReentrantLock();// notEmpty条件对象,当队列为空时用于挂起消费线程
private final Condition notEmpty = takeLock.newCondition();// 生产线程使用的锁
private final ReentrantLock putLock = new ReentrantLock();// notFull条件对象,当队列已满时用于挂起生产线程
private final Condition notFull = putLock.newCondition();
- 采用单链表结构来保存数据,因此具有头、尾结点的引用
head、last
,链表结点类型是内部类Node
类型 LinkedBlockingQueue
的容量最大是Integer.MAX_VALUE
。使用一个AtomicInteger
类型的原子变量count
来作为计数器,它是线程安全的takeLock
作为消费线程获取的锁,同时有个对应的notEmpty
条件变量用于消费线程的阻塞和唤醒,putLock
作为生产线程获取的锁,同时有个对应的notFull
条件变量用于生产线程的阻塞和唤醒
LinkedBlockingQueue
构造函数
// 无参构造器
public LinkedBlockingQueue() {// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueuethis(Integer.MAX_VALUE);
}// 创建一个具有指定容量的 LinkedBlockingQueue
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化头结点和尾节点,指向同一个值为 null的哨兵结点last = head = new Node<E>(null);
}public LinkedBlockingQueue(Collection<? extends E> c) {// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueuethis(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;// 获取生产者锁putLock.lock(); try {int n = 0;// 遍历指定集合for (E e : c) {if (e == null)throw new NullPointerException();// n == capacity == Integer.MAX_VALUE,抛出队列已满异常if (n == capacity)throw new IllegalStateException("Queue full");// 调用 enqueue() 方法插入新节点到队列尾部enqueue(new Node<E>(e));++n;}// 设置阻塞队列的元素个数count.set(n);} finally {putLock.unlock();}
}private void enqueue(Node<E> node) {// 原尾节点的 next 引用指向 node节点,然后 last指向最新 node结点last = last.next = node;
}
入队操作
put(E e)
方法
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 预先设置 c为 -1,约定负数为入队失败int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 获取可中断的生产者锁,即响应中断的生产者锁putLock.lockInterruptibly();try {// 队列的容量等于阻塞队列的元素个数,即队列是否满了while (count.get() == capacity) {// 该入队线程会产生阻塞,并释放锁,被唤醒之后会继续尝试获取锁,并循环判断notFull.await();}// 队列没有满,元素添加到队列尾部enqueue(node);// 获取此时阻塞队列的元素个数,并且计数器值自增 1,赋给 cc = count.getAndIncrement();// 如果 c+1小于 capacity,说明还可以入队if (c + 1 < capacity)// 唤醒一个在 notFull条件队列中等待的入队线程notFull.signal();} finally {putLock.unlock();}// c==0 说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程if (c == 0)signalNotEmpty();
}private void enqueue(Node<E> node) {last = last.next = node;
}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;// 阻塞式的获取消费者锁takeLock.lock();try {// 唤醒一个在 notEmpty条件队列中等待的消费线程notEmpty.signal();} finally {takeLock.unlock();}
}
- 入队第一步,获取入队锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列
- 如果队列已满,该入队线程会产生阻塞,并释放锁,被唤醒之后会继续尝试获取锁,并循环判断
- 如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程
- 最后检查入队前的队列是否为空(
c == 0
就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒
offer(E e)
方法
/**- 将指定的元素插入到此队列的尾部- - @param e 指定元素- @return 在成功时返回 true,如果此队列已满,则不阻塞,立即返回 false*/
public boolean offer(E e) {if (e == null) throw new NullPointerException();// 获取队列元素个数final AtomicInteger count = this.count;// 如果容量满了if (count.get() == capacity)// 直接返回 false,可以节省锁的获取和释放的开销return false;// 预先设置 c为 -1,约定负数为入队失败int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;// 获取入队锁putLock.lock();try {// 如果队列未满if (count.get() < capacity) {// 队列没有满,元素添加到队列尾部enqueue(node);// 获取此时计数器的值赋给c,并且计数器值自增1,这里的c一定是大于等于0的值c = count.getAndIncrement();// 如果 c+1 小于 capacity,说明还可以入队if (c + 1 < capacity)// 如果添加数据后还队列还没有满,则继续调用 notFull 的 signal方法唤醒其他等待在入队的线程notFull.signal();}} finally {putLock.unlock();}// c==0 说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程if (c == 0)signalNotEmpty();// 如果c>=0,表示该元素已添加到此队列,则返回 true;否则返回 falsereturn c >= 0;
}
offer(E e)
方法与put(E e)
方法的原理几乎一致- 对于
offer(E e)
方法来说,如果因为获取不到锁而在同步队列中等待的时候被中断也会继续等待获取锁,即不响应中断 - 对于
offer(E e)
方法来说,如果发现此队列已满,则立即返回false
,而不会阻塞在条件队列上;而put(E e)
方法则会阻塞,并产生循环判断
add(E e)
方法
public boolean add(E e) {// 调用 offer() 方法if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
- 该方法是其父类
AbstractQueue
的方法
出队操作
take()
方法
public E take() throws InterruptedException {E x;// 预先设置 c为 -1,约定负数为出队失败int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 拿到可中断的出对锁takeLock.lockInterruptibly();try {// 如果队列为空while (count.get() == 0) {// 该出队线程进行阻塞并释放锁,被唤醒之后会继续尝试获取锁、并循环判断notEmpty.await();}// 队列没有空,获取并移除此队列的头部x = dequeue();c = count.getAndDecrement();if (c > 1)// 如果 c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)// 移除元素之前队列是满的,唤醒生产线程进行添加元素signalNotFull();return x;
}private E dequeue() {// 获取到head节点Node<E> h = head;// 获取到head节点指向的下一个节点Node<E> first = h.next;// head节点原来指向的节点的next指向自己,等待下次gc回收h.next = h; // head节点指向新的节点head = first;// 获取到新的head节点的item值E x = first.item;// 新head节点的item值设置为nullfirst.item = null;return x;
}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {// 唤醒一个在 notFull条件队列中阻塞的入队线程notFull.signal();} finally {// 释放生产者锁putLock.unlock();}
}
- 出队第一步,获取出队锁,这样保证了线程安全,保证了同一时刻只能有一个出队线程在操作队列
- 如果队列为空,该出队线程会产生阻塞,并释放锁,被唤醒之后会继续尝试获取锁,并循环判断
- 队列不为空,获取并移除此队列的头部;然后检查当前队列是否空了,如果不为空,则唤醒其他出队线程
- 如果
c == capacity
,那么此前队列中数据已满,可能此时有入队线程在等待,这里需要唤醒一个生产者线程。如果此前队列中的数据没有满,那么也不必唤醒生产者
poll()
方法
public E poll() {final AtomicInteger count = this.count;// 如果队列为空if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;// 拿到不可中断的出队锁takeLock.lock();try {if (count.get() > 0) {// 队列没有空,获取并移除此队列的头部x = dequeue();c = count.getAndDecrement();if (c > 1)// 如果 c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程notEmpty.signal();}} finally {takeLock.unlock();}// 如果 c == capacity就是说队列中有一个空位,唤醒入队线程if (c == capacity)signalNotFull();return x;
}
poll()
方法和take()
方法原理基本一致- 对于
poll()
方法来说,如果因为获取不到锁而在同步队列中等待的时候被中断也会继续等待获取锁,即不响应中断。但是如果队列为空,则直接返回null
而不会阻塞等待 - 对于
take()
方法来说,如果队列为空,则出队线程进行阻塞并释放锁,被唤醒之后会继续尝试获取锁、并循环判断
删除元素操作
remove()
方法
public E remove() {// 直接调用 poll() 方法,获取返回值xE x = poll();if (x != null)return x;elsethrow new NoSuchElementException();
}
获取元素操作
peek()
方法
public E peek() {// 如果队列空了if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;// 拿到不可中断的出队锁takeLock.lock();try {// 获取 head 的 next结点,它才可能是真正的头结点Node<E> first = head.next;// 如果为 null,说明队列空了if (first == null)return null;else// 否则,返回值return first.item;} finally {takeLock.unlock();}
}
element()
方法
public E element() {// 调用 peek() 方法获取返回值 xE x = peek();if (x != null)return x;elsethrow new NoSuchElementException();
}
LinkedBlockingQueue
总结
LinkedBlockingQueue
底层采用单链表来实现阻塞队列,内部具有结点的实现类Node
,每一个元素值都有一个Node
结点对象来保存LinkedBlockingQueue
可以不指定容量,默认就是最大容量Integer.MAX_VALUE
,但是容量过大、元素过大,并且生产线程速度快于消费线程,则可能造成内存溢出LinkedBlockingQueue
采用了锁分离技术,具有两把锁takeLock、putLock
;takeLock
作为出队线程获取的锁,同时有个对应的notEmpty
条件变量用于出队线程的阻塞和唤醒,putLock
作为入队线程获取的锁,同时有个对应的notFull
条件变量用于入队线程的阻塞和唤醒,这样避免了入队线程和出队线程竞争同一把锁的现象- 添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量
LinkedBlockingQueue
底层使用了ReentrantLock + CAS + Concition
来实现的,但是本质上还是使用了AQS
框架的;另外注意count
是AtomicInteger
类的实例引用,保证了其原子性
生产者-消费者模型之集合LinkedBlockingQueue源码解读相关推荐
- Pseudo-document-based Topic Model(基于伪文档的主题模型)的理解以及源码解读
本文作者:合肥工业大学 管理学院 钱洋 email:1563178220@qq.com 内容可能有不到之处,欢迎交流. 未经本人允许禁止转载. 论文来源 Zuo Y, Wu J, Zhang H, e ...
- 从 Netpoll 中寻找 BIO/NIO 编程模型的对比 | Netpoll 源码解读
前言 最近在阅读<Go 组件设计与实现>这本小册,其中让我很感兴趣的一点是为什么在字节开源中间件团队 CloudWeGo 所开发的网络库 Netpoll 中使用了 NIO 模型,而没有使用 ...
- 判别模型的玻尔兹曼机论文源码解读
前言 三号要去参加CAD/CG会议,投了一篇关于使用生成模型和判别模型的RBM做运动捕捉数据风格识别的论文.这段时间一直搞卷积RBM了,差点把原来的实验内容都忘记了,这里复习一下判别式玻尔兹曼机的训练 ...
- java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...
导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...
- mysql服务器多线程模型_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码 - 陈彦斌 - 博客园...
导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...
- LinkedBlockingQueue 实现生产者消费者模型
并发编程栏目代码 GitHub package 地址: 点击打开链接 博客并发编程栏目 : 点击打开链接 实现 LinkedBlockingQueue是一个基于已链接节点的.范围任意的blocking ...
- Java并发编程笔记之LinkedBlockingQueue源码探究
LinkedBlockingQueue的实现是使用独占锁实现的阻塞队列.首先看一下LinkedBlockingQueue 的类图结构,如下图所示: 如类图所示:LinkedBlockingQueue是 ...
- 多线程-生产者-消费者模型
一.前言 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例.该问题描 ...
- 三种方式实现生产者-消费者模型
前言 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例.该问题描述了 ...
- 互斥锁、共享内存方式以及生产者消费者模型
守护进程 1.守护进程的概念 进程指的是一个正在运行的程序,守护进程也是一个普通进程 意思就是一个进程可以守护另一个进程 import time from multiprocessing import ...
最新文章
- python是一种语言还是一个软件-Python还是一种
- shell 脚本执行 sql
- 使用FFMPEG SDK解码流数据获得YUV数据及其大小
- 前端笔记-对webpack和vue的基本认识
- 最新聚合支付四方系统完整源码+修复大量BUG/新UI
- html网页该插件不受支持,该插件不受支持怎么办
- gis如何加入emf图片_ArcGIS教程:地图导出格式,教你如何选择
- H5开发html文件转换pdf,将HTML页面转换为PDF文件并导出
- 对注册会计师CPA的简单了解
- 微信小程序自定义picker多列选择器
- Unity Shader - 板砖日志 - 简单的树、草 等植物的 随风飘扬 动画
- 【转载】网站关闭了域名备案信息是否需要注销,答案是一定要记得注销域名备案信息
- 【C++从青铜到王者】第一篇:C++入门
- 百词斩前端面经(待整理)
- 还在不停切换聊天窗口进行回复的客服请看过来
- 程序员桌面上的EDO
- sciencedirect 网站抓取过程
- Eclipse设置自动换行
- 八猴模型html文件,使用Marmoset Toolbag八猴渲染器的Marmoset Viewer进行离线本地观察...
- 中断原理及WDT驱动编程
热门文章
- App Ratings iOS
- golang 大数据平台_一文读懂数据平台、大数据平台、数据中台
- 序列最小最优化算法(SMO)
- python函数 range()和arange()
- 举例说明Java的反射机制,简单的Java反射机制
- 凸优化第三章凸函数 3.4拟凸函数
- 【知识图谱系列】基于Randomly Perturb的图谱预训练模型GraphCL
- 【OpenGL 实验一】图元的生成+区域填充
- Hamilton-Caylay (哈密尔顿-凯莱)定理
- tdoa/aoa定位的扩展卡尔曼滤波定位算法matlab源码,03TDOA_AOA定位的扩展卡尔曼滤波算法MATLAB源代码...