故事背景

最近面试被人问道了如何实现一个LinkedBlockingQueue,顿时懵逼。
对话如下:

面试官: 先介绍下你最近做过的项目吧.
我: 阿巴阿巴阿巴.
面试官: 好的,那么如果让你设计一个LinkedBlockingQueue,如何实现?
我: 好的,我会…emmm,忘记了(非常尴尬)

这东西堵塞队列我知道,但内部如何实现的还真不知道,因为类似的功能通常都考虑以异步的生产者消费者方式去实现(至少我是如此的)。
阻塞队列主要是以队列为中心,控制生产和消费速率,这对于异步队列就需要其他方式去控制。
希望借此文帮助学习分享。

结构分析


上图所示,LinkedBlockingQueue就是在集合的功能上又增加了队列的一些操作,开始逐个分析.

java.util.Collection

java集合的根接口,主要规定了集合的基本操作,包括单/多个元素的增删查,集合数量查询,流失对象返回.
作为根接口,为了保证通用性,其约束比较宽松,只是通过文档提出了对实现的规范.

java.util.AbstractCollection

public abstract class AbstractCollection<E> implements Collection<E>

这是对java.util.Collection接口的实现的基本骨架,是一个不可修改集合元素的实现类.
其中的方法在具体实现类中有更有效的处理方式的话可以替换.
实现方法依赖java.util.Iterator,如果要一个可修改的集合需要在子类重新实现取代默认的迭代器.

java.util.Queue

public interface Queue<E> extends Collection<E>

这是队列的根接口,主要在java.util.Collection接口的基础上,额外提供了不抛出异常的增取看操作.
此外其元素的存储应该是有序的,但具体顺序有由子类处理.
堵塞队列交由java.util.concurrent.BlockingQueue接口定义.
null由于会被当做队列当前没有元素的毒丸对象(特殊值),最好不允许插入.
由于存储的元素是有序的,因此需要考虑重新实现equalshashCode方法(当元素值相等,但hash不同时).

java.util.concurrent.BlockingQueue

public interface BlockingQueue<E> extends Queue<E>

该接口在Queue的基础上,提供了具有时间限制的增取操作.
BlockingQueue提供了4种形式的操作:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()
原本集合的某些方法在队列中使用需要注意,因为其内部基于迭代器,不能保证线程安全。
文档特地说明了,队列操作方法(除了大块的操作,如addAll)需要线程安全,但不限定实现的方式.
多线程操作同一个队列需要按顺序操作.

注意: 不能往该队列添加null值,因为其被用作poll操作失败(由于容量有限,且在指定时间内未完成)的返回值,表示队列当前没有元素.

java.util.AbstractQueue

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E>

该抽象类提供了Queue实现的基本骨架.
主要是Queue接口的相关方法是抛出异常而不是Collections返回的特殊值(nullfalse). 同时,不允许插入null值.

java.util.concurrent.LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

LinkedBlockingQueue是堵塞队列的一种实现,其内部以单链表维护元素.
链表的实现方式相对于数组,具有较高的吞吐量,但在并发环境下不太好预测性能.

LinkedBlockingQueue详细分析

属性

// 最大容量,默认为Integer.MAX_VALUE
private final int capacity;// 当前元素数量
private final AtomicInteger count = new AtomicInteger();// 队列头, 不变 head.item == null
transient Node<E> head;// 队尾, 不变 last.item == null
private transient Node<E> last;// take, poll等操作持有的锁,其实就是对队列读取的锁
private final ReentrantLock takeLock = new ReentrantLock();// 正在等待取的线程队列
private final Condition notEmpty = takeLock.newCondition();// put, offer等操作持有的锁,其实就是对队列写的锁
private final ReentrantLock putLock = new ReentrantLock();// 正在等待写入的线程队列
private final Condition notFull = putLock.newCondition();static class Node<E> {E item;// 只会是下面情况的一种: 1.真的后继节点; 2.null,没有后继节点,也就是最后的节点; 3.head节点;Node<E> next;Node(E x) { item = x; }
}

通过上面的说明,能了解到LinkedBlockingQueue本身的基础是单链表,通过ReentrantLocknewCondition()生成对应操作的等待队列,以此保证线程安全及数据有无的通知.

构造函数

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}// 限制队列容量,并初始化队列的 head 和 last 节点.
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);
}// LinkedBlockingQueue(int capacity)初始化,然后加写锁,将集合c一个个入队.
public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // 写锁(以重入锁实现,对队尾的插入进行控制)try {int n = 0;for (E e : c) {// null元素抛出异常if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e)); //将元素封装成Node,入队++n;}count.set(n);} finally {putLock.unlock(); // 释放}
}

入队出队

// 入队,将节点插入到last之前
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;
}// 出队,将head的后继返回出来
private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
}

加锁,释放锁

// 完全加锁,避免其他线程写入和读取
// 在我看来就是加读写锁
void fullyLock() {putLock.lock();takeLock.lock();
}// 完全解锁
// 在我看来就是释放读写锁
void fullyUnlock() {takeLock.unlock();putLock.unlock();
}

加元素

// 除非线程被中断,否则堵塞到写入成功
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 可中断锁putLock.lockInterruptibly();try {// 队列没有空间,自旋等待有空while (count.get() == capacity) {// 等待队列未满信号notFull.await();}// 队列有空间,入队enqueue(node);c = count.getAndIncrement();// 如果队列还有空间,通知等待队列if (c + 1 < capacity)// 发送队列未满信号notFull.signal();} finally {// 释放锁putLock.unlock();}if (c == 0)// 发送队列已满信号signalNotEmpty();
}// 添加元素,直到超时失败或者中断异常
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {if (nanos <= 0)return false;// 等待队列未满信号 nanos长的时间// 返回的nanos是收到信号后剩余的时间// 因为存在并发写入的可能,所以需要自旋/锁保证(自旋通常更高效)nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;
}// 添加元素,有空间且添加成功返回true,否则false
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)// 队列没有空间,直接返回return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;// 堵塞到获取到锁putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;
}

取元素

取元素的行为与加元素的类似,只不过写锁变读锁,条件notFull变为notEmpty.

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}

删元素

// 删除p节点,trail为p的前驱节点
void unlink(Node<E> p, Node<E> trail) {p.item = null;trail.next = p.next;if (last == p)last = trail;// 队列未满,发出未满信号if (count.getAndDecrement() == capacity)notFull.signal();
}// 移除匹配的元素
public boolean remove(Object o) {if (o == null) return false;fullyLock();try {// trail表示当前的尾巴节点(item为null)// p表当前比较的节点// head,last都是item为null的特殊节点for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {// 找到需要的元素if (o.equals(p.item)) {// 删除unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

继承自Collection的方法

这些方法的实现大同小异,就不细讲了.
主要就是在开始完全加锁fullyUnlock(),然后通过链表遍历,获取所需元素.

ReentrantLock

重入锁.
其内部通过state去记录当前线程加锁次数,以此实现重入.
然后释放也需要相同的次数,实现完全释放.
ReentrantLock,当A线程已经获得锁时,B线程必须等待A释放才能获取.
内部是基于AQS实现的类CLH队列维护的2个队列,分别是等待取锁等待条件(该队列中的节点收到通知会被加入等待取锁队列).

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject

条件对象.
其就是LinkedBlockingQueueprivate final Condition notFull = putLock.newCondition();newCondition()所创建的实际对象.
内部通过java.util.concurrent.locks.AbstractQueuedSynchronizer.NodenextWaiter属性维护节点的排他/共享标识.
因此,每当从LinkedBlockingQueue获取一个元素,同时会检查元素数量是否有剩余,如有调用notEmpty.signal()方法,通知一个等待获取该条件的线程.
从等待条件队列出队的线程会被加入到双链表的待执行队列中,等待前驱执行完毕,再执行.

加锁逻辑

// 非公平锁
static final class NonfairSync extends Sync {final void lock() {if (compareAndSetState(0, 1))// 加锁成功 这是加锁的简单逻辑,应该是对锁争用不激烈的优化setExclusiveOwnerThread(Thread.currentThread());else// 加锁失败,进入尝试取锁阶段acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// 检查并设置当前锁的持有线程, true表示成功持有锁final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 对减少CAS的优化,类似于双检索的逻辑if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {// 当前线程已经持有锁,则增加重入计数int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// 来自 AQS 的方法public final void acquire(int arg) {// 调用上面的 tryAcquire 逻辑if (!tryAcquire(arg) &&// addWaiter 创建一个排他的CLH节点// acquireQueued将创建好的Node加入到CLH队列中,并且会检查是否还存在前驱节点,如果没有,则自身会尝试取锁acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
}

非公平锁与公平锁逻辑基本相同,只有在开始 tryAcquire 时会额外检查是否存在前驱节点,如果有则直接返回取锁失败。

等待取锁的线程的队列:

CLH

JUC(java.util.concurrent)是java并发工具包,基本由Doug Lea编写,像这样大神人物写出的代码,有相当的学习价值。

其实现了多线程同步的基本框架,内部通过一个CLH队列维护多线程对条件资源的获取。

CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

JDK 1.8中AQS的CLH是一个变种的实现,主要将原本使用的单链表改造为双链表,增加了对前驱节点的引用,用于检查前驱节点失效时剔除以及直接唤醒。

一些技巧

将类成员变量复制到方法本地变量上

Doug Lea写的JUC很多地方都将类成员变量复制到方法本地变量上,为什么要多此一举?
<<Performance of locally copied members ?>>
内容如下:

It’s a coding style made popular by Doug Lea.
It’s an extreme optimization that probably isn’t necessary;
you can expect the JIT to make the same optimizations.
(you can try to check the machine code yourself!)
Nevertheless, copying to locals produces the smallest bytecode, and for low-level code it’s nice to write code that’s a little closer to the machine.
Also, optimizations of finals (can cache even across volatile reads) could be better. John Rose is working on that.
For some algorithms in j.u.c, copying to a local is necessary for correctness.

大致意思:

这是Doug Lea带来的编码风格.
一种极端的优化方式,可能没啥用。你可以期望 JIT 也会执行相同的优化。
将类成员变量复制为方法本地变量可使得生成最小的字节码;也更容易编写靠近机器的底层代码。
在某些算法中(特别是JDK并发包中的算法),复制为本地变量,或使用final变量,对于保证并发正确性很有必要。

所以我们并不需要对这种代码模式过于担心。你做好并发/同步相关正确性编码,保证算法正确性就行,不需要去刻意模仿使用这种代码模式。

JAVA LinkedBlockingQueue详细分析相关推荐

  1. java消费者模式_基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  2. gsm模块 java 录音_Android GSM驱动模块详细分析

    Android的RIL驱动模块, 在hardware/ril目录下,一共分rild,libril.so以及librefrence_ril.so三个部分,另有一 radiooptions可供自动或手动调 ...

  3. Java的自动装箱与拆箱详细分析

    Java的自动装箱与拆箱详细分析 1. 既然说是装箱与拆箱,那么到底是装的什么,拆的什么? 装箱:将基本数据类型封装起来,用他对应的引用类(包装类)来处理 拆箱:就是把引用类里面的基本数据拆出来 2. ...

  4. java lam表达式_详细分析Java Lambda表达式

    在了解Lambda表达式之前我们先来区分一下面向对象的思想和函数式编程思想的区别 面向对象的思想: 做一件事情,找一个能解决这个事情的对象,调用他的方法来解决 函数时编程思想: 只要能获取到结果,谁去 ...

  5. java共识算法_PBFT共识算法详细分析及Java实现

    PBFT共识算法详细分析及Java实现 为什么写这个 最近研究了区块链相关的一些东西,其实就三大块: 分布式存储(去中心) 共识机制 安全加密 分布式存储,就是一个分布式数据库,每个节点都保存一份副本 ...

  6. Java 代理模式的实现和原理详细分析

    文章目录 代理模式 静态代理 1. 静态代理的概念 2. 静态代理的实现 动态代理 1. 动态代理的概念 2. 动态代理的实现 2.1 如何创建一个动态代理对象 2.2 完整的动态代理的例子 3.动态 ...

  7. Java高级开发面试,红黑树详细分析(图文详解)

    开头 如果Redis的读写请求量很大,那么单个实例很有可能承担不了这么大的请求量,如何提高Redis的性能呢?你也许已经想到了,可以部署多个副本节点,业务采用读写分离的方式,把读请求分担到多个副本节点 ...

  8. java生产线消费者,基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  9. java程序员二级_Java程序员认证模拟题及详细分析(2)

    Java程序员认证模拟题及详细分析(2) 分类:计算机等级 | 更新时间:2016-07-08| 来源:转载 Java程序员认证模拟题及详细分析(1) 26. Give following class ...

  10. wed后端和java的区别_web前端和web后端的区别详细分析

    原标题:web前端和web后端的区别详细分析 在刚开始从事web开发时,首先要选准学习方向,看是想从事前端部分还是后端程序部分.当然在工作的后期,就不会分的那么细致了.做前端到后期也会懂一些后端的技术 ...

最新文章

  1. .NET 并行(多核)编程系列之七 共享数据问题和解决概述
  2. G面经prepare: Reorder String to make duplicates not consecutive
  3. mysql清理连接数缓存,MySQL连接池、线程缓存、线程池的区别
  4. 螺旋桨设计软件_我们又双叒叕获得一项国家软件著作权!
  5. 神经网络算法-论证单层感知器的局限性
  6. python编写程序输入整数n求n_Python入门习题----N=ABXBA
  7. CreateFeatureClass COM异常
  8. 如何评价周志华深度森林模型
  9. npm 安装 git linux,如何直接从GitHub安装NPM软件包?
  10. doc 问卷调查模板表_问卷调查Word模板.doc
  11. swustoj 1132 Coin-collecting by robot
  12. linux系统及编程基础唐晓君,Linux-Shell编程之判断文件类型
  13. 机器学习特征工程--标准化和归一化
  14. easyexcel导出
  15. hadoop jar xxxx.jar 执行的流程
  16. 控制类(Controller)
  17. 微信餐饮小程序有必要开发吗
  18. DEAP:使用生理信号进行情绪分析的数据库(三、实验分析与结论)
  19. 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换
  20. c语言编写天气预报程序,在Deno中构建一个命令行天气预报程序

热门文章

  1. 收集一些电子书下载的网站
  2. 小猫钓鱼纸牌游戏 python
  3. 【渝粤题库】陕西师范大学292141政府规制经济学作业(专升本)
  4. JS 把数组按倒序排列
  5. 【考研数学】常用数学公式大全
  6. 芯力特SIT1043Q完全替代恩智浦TJA1043
  7. 分布式事务CAP理论
  8. 淘宝客淘宝联盟解析二合一链接获取优惠链接还原二合一,提取优惠信息
  9. java x86 x64_x86 版和x64版有什么区别?
  10. 三分钟快速理顺HTMLJSP中单、双引号用法、含义