  • LinkedBlockingQueue是一种基于单向链表实现的有界的(可选的,不指定默认int最大值)阻塞队列。队列中的元素遵循先入先出 (FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。(在并发程序中,基于链表实现的队列和基于数组实现的队列相比,往往具有更高的吞吐 量,但性能稍差一些)
  • 首先看下LinkedBlockingQueue内部的数据结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;/*** Linked list node class*/static class Node<E> {/** The item, volatile to ensure barrier separating write and read */volatile E item;Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** 这里的count为原子量,避免了一些使用count的地方需要加两把锁。 */private final AtomicInteger count = new AtomicInteger(0);/** Head of linked list */private transient Node<E> head;/** Tail of linked list */private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Creates a <tt>LinkedBlockingQueue</tt> with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if <tt>capacity</tt> is not greater*         than zero*/public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);for (E e : c)add(e);}


  • 还是从put和take入手,先看下put方法:
    public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset// local var holding count  negative to indicate failure unless set.int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from* capacity. Similarly for all other uses of count in* other wait guards.*/try {while (count.get() == capacity)notFull.await();} catch (InterruptedException ie) {notFull.signal(); // propagate to a non-interrupted threadthrow ie;}insert(e);c = count.getAndIncrement();if (c + 1 < capacity)/* * 注意这里的处理:和单锁队列不同,count为原子量,不需要锁保护。* put过程中可能有其他线程执行多次get,所以这里需要判断一下当前* 如果还有剩余容量,那么继续唤醒notFull条件上等待的线程。*/notFull.signal(); } finally {putLock.unlock();}if (c == 0) //如果count又0变为1,说明在队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。
            signalNotEmpty();}/*** Creates a node and links it at end of queue.* @param x the item*/private void insert(E x) {last = last.next = new Node<E>(x);}/*** Signals a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}


    public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {try {while (count.get() == 0)notEmpty.await();} catch (InterruptedException ie) {notEmpty.signal(); // propagate to a non-interrupted threadthrow ie;}x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}/*** Removes a node from head of queue,* @return the node*/private E extract() {Node<E> first = head.next;head = first;E x = first.item;first.item = null;return x;}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}


  • 上面看到,主要方法里并没有同时用两把锁,但有些方法里会同时使用两把锁,比如remove方法等:
    public boolean remove(Object o) {if (o == null) return false;boolean removed = false;fullyLock();try {Node<E> trail = head;Node<E> p = head.next;while (p != null) {if (o.equals(p.item)) {removed = true;break;}trail = p;p = p.next;}if (removed) {p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signalAll();}} finally {fullyUnlock();}return removed;}/*** Lock to prevent both puts and takes.*/private void fullyLock() {putLock.lock();takeLock.lock();}/*** Unlock to allow both puts and takes.*/private void fullyUnlock() {takeLock.unlock();putLock.unlock();}

