功能简介:
  • LinkedBlockingQueue是一种基于单向链表实现的有界的(可选的,不指定默认int最大值)阻塞队列。队列中的元素遵循先入先出 (FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。(在并发程序中,基于链表实现的队列和基于数组实现的队列相比,往往具有更高的吞吐 量,但性能稍差一些)
源码分析:
  • 首先看下LinkedBlockingQueue内部的数据结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;/*** Linked list node class*/static class Node<E> {/** The item, volatile to ensure barrier separating write and read */volatile E item;Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** 这里的count为原子量,避免了一些使用count的地方需要加两把锁。 */private final AtomicInteger count = new AtomicInteger(0);/** Head of linked list */private transient Node<E> head;/** Tail of linked list */private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Creates a <tt>LinkedBlockingQueue</tt> with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if <tt>capacity</tt> is not greater*         than zero*/public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);for (E e : c)add(e);}

  首先可见,内部为单向链表;其次,内部为两把锁:存锁和取锁,并分别关联一个条件(是一种双锁队列)。

  • 还是从put和take入手,先看下put方法:
    public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset// local var holding count  negative to indicate failure unless set.int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from* capacity. Similarly for all other uses of count in* other wait guards.*/try {while (count.get() == capacity)notFull.await();} catch (InterruptedException ie) {notFull.signal(); // propagate to a non-interrupted threadthrow ie;}insert(e);c = count.getAndIncrement();if (c + 1 < capacity)/* * 注意这里的处理:和单锁队列不同,count为原子量,不需要锁保护。* put过程中可能有其他线程执行多次get,所以这里需要判断一下当前* 如果还有剩余容量,那么继续唤醒notFull条件上等待的线程。*/notFull.signal(); } finally {putLock.unlock();}if (c == 0) //如果count又0变为1,说明在队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。
            signalNotEmpty();}/*** Creates a node and links it at end of queue.* @param x the item*/private void insert(E x) {last = last.next = new Node<E>(x);}/*** Signals a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}

代码很容易看懂,再看下take方法实现:

    public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {try {while (count.get() == 0)notEmpty.await();} catch (InterruptedException ie) {notEmpty.signal(); // propagate to a non-interrupted threadthrow ie;}x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}/*** Removes a node from head of queue,* @return the node*/private E extract() {Node<E> first = head.next;head = first;E x = first.item;first.item = null;return x;}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}

和put对等的逻辑,也很容易看懂。

  • 上面看到,主要方法里并没有同时用两把锁,但有些方法里会同时使用两把锁,比如remove方法等:
    public boolean remove(Object o) {if (o == null) return false;boolean removed = false;fullyLock();try {Node<E> trail = head;Node<E> p = head.next;while (p != null) {if (o.equals(p.item)) {removed = true;break;}trail = p;p = p.next;}if (removed) {p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signalAll();}} finally {fullyUnlock();}return removed;}/*** Lock to prevent both puts and takes.*/private void fullyLock() {putLock.lock();takeLock.lock();}/*** Unlock to allow both puts and takes.*/private void fullyUnlock() {takeLock.unlock();putLock.unlock();}

Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue相关推荐

  1. Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue

    功能简介: ArrayBlockingQueue是一种基于数组实现的有界的阻塞队列.队列中的元素遵循先入先出(FIFO)的规则.新元素插入到队列的尾部,从队列头部取出元素. 和普通队列有所不同,该队列 ...

  2. Jdk1.6 JUC源码解析(1)-atomic-AtomicXXX

    转自:http://brokendreams.iteye.com/blog/2250109 功能简介: 原子量和普通变量相比,主要体现在读写的线程安全上.对原子量的是原子的(比如多线程下的共享变量i+ ...

  3. Jdk1.8 JUC源码增量解析(2)-atomic-LongAdder和LongAccumulator

    转载自 Jdk1.8 JUC源码增量解析(2)-atomic-LongAdder和LongAccumulator 功能简介: LongAdder是jdk1.8提供的累加器,基于Striped64实现. ...

  4. Jdk1.8 JUC源码增量解析(1)-atomic-Striped64

    转载自  Jdk1.8 JUC源码增量解析(1)-atomic-Striped64 功能简介: Striped64是jdk1.8提供的用于支持如Long累加器,Double累加器这样机制的基础类. S ...

  5. JDK1.8 ConcurrentHashMap 源码解析

    概述 ConcurrentHashMap 是 util.concurrent 包的重要成员. ConcurrentHashMap 的源代码会涉及到散列算法,链表数据结构和红黑树 Java8 Concu ...

  6. JDK1.8 HashMap源码解析(不分析红黑树部分)

    一.HashMap数据结构 HashMap由 数组+链表+红黑树实现,桶中元素可能为链表,也可能为红黑树.为了提高综合(查询.添加.修改)效率,当桶中元素数量超过TREEIFY_THRESHOLD(默 ...

  7. Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式

    Redis源码解析(1) 动态字符串与链表 Redis源码解析(2) 字典与迭代器 Redis源码解析(3) 跳跃表 Redis源码解析(4) 整数集合 Redis源码解析(5) 压缩列表 Redis ...

  8. JUC.Condition学习笔记[附详细源码解析]

    JUC.Condition学习笔记[附详细源码解析] 目录 Condition的概念 大体实现流程 I.初始化状态 II.await()操作 III.signal()操作 3个主要方法 Conditi ...

  9. HashMap源码解析(JDK1.8)

    HashMap源码解析(JDK1.8) 目录 定义 构造函数 数据结构 存储实现源码分析 删除操作源码分析 hashMap遍历和异常解析 1. 定义 HashMap实现了Map接口,继承Abstrac ...

最新文章

  1. Windows Server 2012正式版RDS系列⑤
  2. 2440 8字数码管 显示0到10 c语言,51单片机对8位数码管依次显示0-7的设计
  3. 对话jQuery之父John Resig:JavaScript的开发之路
  4. 织梦DedeCMS实现 三级栏目_二级栏目_一级栏目_网站名称 的效果代码
  5. 【手算】哈夫曼编码—树形倒置快速画法
  6. 德勤发布《 2020 亚太四大半导体市场的崛起》报告,美国收入占比达到47%,中国大陆仅占 5%
  7. MSComm控件过程中内存溢出和GetOneDimSize出错的问题
  8. chrome插件推荐
  9. 服务器内存太小,伤不起![异常与应用程序池引发的连锁命案]
  10. python监听文件更改记录_同事利用Python制作微信机器人自动监控群聊!
  11. GHOST常用参数详解,让你成为GHOST高手(转)
  12. Android报错:The processing instruction target matching [xX][mM][lL] is not allowed.
  13. 为什么计算机编程以英语为主,为什么英语对于编程来说非常重要
  14. 对接支付宝、微信、第三方支付,超详细讲解+demo演示
  15. 计算机写给未来自己的一段话,写给未来的自己一句话致未来自己的句子简短励志...
  16. vue3 script setup写法
  17. QCC3040---Message Broker module
  18. html无插件播放流,浏览器无插件播放网络视频流RTSP/H264/WEB CAM
  19. JS 四舍五入保留两位小数
  20. OpenCV_车辆检测实战

热门文章

  1. WIN server 2003 下无法安装adobe cs3 终极解决方法。
  2. 为sort函数指定排序规则时注意的问题以及错误的写法
  3. 交换排序之——冒泡排序(c/c++)
  4. 带你一起撸一遍 nodejs 常用核心模块(二)
  5. ORA-12638: 身份证明检索失败 的解决办法
  6. ansible之setup模块常用的信息
  7. 《Android的设计与实现:卷I》——第2章 框架基础JNI
  8. Git基础之(二十)——标签管理——创建标签
  9. JVM参数设置和分析
  10. linux命令使用全集