面试中问完基础基本上就是考集合,因为集合的使用在业务开发中经常使用,而且集合的数据结构也是算法的基础,所以你对集合的掌握深度可能决定你有没有接着面的资格

List & Queue 实现类

List 接口的实现类主要有:ArrayList、LinkedList、Stack 、Vector以及CopyOnWriteArrayList 等;

Queue接口的主要实现类有:ArrayDeque、ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue等;

3.2 List

  • 有顺序,可重复

ArrayList

  • 基于数组实现,无容量的限制。
  • 在执行插入元素时可能要扩容,在删除元素时并不会减小数组的容量,在查找元素时要遍历数组,对于非null的元素采取equals的方式寻找。
  • 是非线程安全的。
  • 注意点:
    • (1)ArrayList随机元素时间复杂度O(1),插入删除操作需大量移动元素,效率较低
    • (2)为了节约内存,当新建容器为空时,会共享Object[] EMPTY_ELEMENTDATA = {}和 Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {}空数组
    • (3)容器底层采用数组存储,每次扩容为1.5倍
    • (4)ArrayList的实现中大量地调用了Arrays.copyof()和System.arraycopy()方法,其实Arrays.copyof()内部也是调用System.arraycopy()。System.arraycopy()为Native方法
    • (5)两个ToArray方法
  • Object[] toArray()方法。该方法有可能会抛出java.lang.ClassCastException异常
  • T[] toArray(T[] a)方法。该方法可以直接将ArrayList转换得到的Array进行整体向下转型
    • (6)ArrayList可以存储null值
    • (7)ArrayList每次修改(增加、删除)容器时,都是修改自身的modCount;在生成迭代器时,迭代器会保存该modCount值,迭代器每次获取元素时,会比较自身的modCount与ArrayList的modCount是否相等,来判断容器是否已经被修改,如果被修改了则抛出异常(fast-fail机制)。

成员变量

/*** Default initial capacity.*/
private static final int DEFAULT_CAPACITY = 10;/*** Shared empty array instance used for empty instances.*/
private static final Object[] EMPTY_ELEMENTDATA = {};/*** Shared empty array instance used for default sized empty instances. We* distinguish this from EMPTY_ELEMENTDATA to know how much to inflate when* first element is added.*/
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};/*** The array buffer into which the elements of the ArrayList are stored.* The capacity of the ArrayList is the length of this array buffer. Any* empty ArrayList with elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA* will be expanded to DEFAULT_CAPACITY when the first element is added.*/
transient Object[] elementData; // non-private to simplify nested class access/*** The size of the ArrayList (the number of elements it contains).** @serial*/
private int size;

构造方法

public ArrayList(int initialCapacity) {if (initialCapacity > 0) {this.elementData = new Object[initialCapacity];} else if (initialCapacity == 0) {this.elementData = EMPTY_ELEMENTDATA;} else {throw new IllegalArgumentException("Illegal Capacity: "+initialCapacity);}
}

添加 add(e)

public boolean add(E e) {ensureCapacityInternal(size + 1);  // Increments modCount!!elementData[size++] = e;return true;
}
  • 即使初始化时指定大小 小于10个,添加元素时会调整大小,保证capacity不会少于10个。
private void ensureCapacityInternal(int minCapacity) {if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);}ensureExplicitCapacity(minCapacity);
}
private void ensureExplicitCapacity(int minCapacity) {modCount++;// overflow-conscious codeif (minCapacity - elementData.length > 0)grow(minCapacity);
}

扩容

private void grow(int minCapacity) {// overflow-conscious codeint oldCapacity = elementData.length;int newCapacity = oldCapacity + (oldCapacity >> 1);if (newCapacity - minCapacity < 0)newCapacity = minCapacity;if (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);// minCapacity is usually close to size, so this is a win:elementData = Arrays.copyOf(elementData, newCapacity);
}
  • Arrays.copyOf底层是System.arrayCopy
public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) {@SuppressWarnings("unchecked")T[] copy = ((Object)newType == (Object)Object[].class)? (T[]) new Object[newLength]: (T[]) Array.newInstance(newType.getComponentType(), newLength);System.arraycopy(original, 0, copy, 0,Math.min(original.length, newLength));return copy;
}
public static native void arraycopy(Object src,  int  srcPos,Object dest, int destPos,int length);

添加 add(index,e)

public void add(int index, E element) {rangeCheckForAdd(index);ensureCapacityInternal(size + 1);  // Increments modCount!!System.arraycopy(elementData, index, elementData, index + 1,size - index);elementData[index] = element;size++;
}
private void rangeCheckForAdd(int index) {if (index > size || index < 0)throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

删除 remove(o)

public boolean remove(Object o) {if (o == null) {for (int index = 0; index < size; index++)if (elementData[index] == null) {fastRemove(index);return true;}} else {for (int index = 0; index < size; index++)if (o.equals(elementData[index])) {fastRemove(index);return true;}}return false;
}
private void fastRemove(int index) {modCount++;int numMoved = size - index - 1;if (numMoved > 0)System.arraycopy(elementData, index+1, elementData, index,numMoved);elementData[--size] = null; // clear to let GC do its work
}

删除 remove(index)

public E remove(int index) {rangeCheck(index);modCount++;E oldValue = elementData(index);int numMoved = size - index - 1;if (numMoved > 0)System.arraycopy(elementData, index+1, elementData, index,numMoved);elementData[--size] = null; // clear to let GC do its workreturn oldValue;
}

获取

public E get(int index) {rangeCheck(index);return elementData(index);
}
private void rangeCheck(int index) {if (index >= size)throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}
  • E elementData(int index) {
    return (E) elementData[index];
    }

更新

public E set(int index, E element) {rangeCheck(index);E oldValue = elementData(index);elementData[index] = element;return oldValue;
}

遍历

public Iterator<E> iterator() {return new Itr();
}/*** An optimized version of AbstractList.Itr*/
private class Itr implements Iterator<E> {int cursor;       // index of next element to returnint lastRet = -1; // index of last element returned; -1 if no suchint expectedModCount = modCount;public boolean hasNext() {return cursor != size;}@SuppressWarnings("unchecked")public E next() {checkForComodification();int i = cursor;if (i >= size)throw new NoSuchElementException();Object[] elementData = ArrayList.this.elementData;if (i >= elementData.length)throw new ConcurrentModificationException();cursor = i + 1;return (E) elementData[lastRet = i];}public void remove() {if (lastRet < 0)throw new IllegalStateException();checkForComodification();try {ArrayList.this.remove(lastRet);cursor = lastRet;lastRet = -1;expectedModCount = modCount;} catch (IndexOutOfBoundsException ex) {throw new ConcurrentModificationException();}}@Override@SuppressWarnings("unchecked")public void forEachRemaining(Consumer<? super E> consumer) {Objects.requireNonNull(consumer);final int size = ArrayList.this.size;int i = cursor;if (i >= size) {return;}final Object[] elementData = ArrayList.this.elementData;if (i >= elementData.length) {throw new ConcurrentModificationException();}while (i != size && modCount == expectedModCount) {consumer.accept((E) elementData[i++]);}// update once at end of iteration to reduce heap write trafficcursor = i;lastRet = i - 1;checkForComodification();}final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}
}

包含

public boolean contains(Object o) {return indexOf(o) >= 0;
}
public int indexOf(Object o) {if (o == null) {for (int i = 0; i < size; i++)if (elementData[i]==null)return i;} else {for (int i = 0; i < size; i++)if (o.equals(elementData[i]))return i;}return -1;
}

LinkedList

  • 基于双向链表机制
  • 在插入元素时,须创建一个新的Entry对象,并切换相应元素的前后元素的引用;在查找元素时,须遍历链表;在删除元素时,须遍历链表,找到要删除的元素,然后从链表上将此元素删除即可。
  • 是非线程安全的。
  • 注意:
    • (1)LinkedList有两个构造参数,一个为无參构造,只是新建一个空对象,第二个为有参构造,新建一个空对象,然后把所有元素添加进去。
    • (2)LinkedList的存储单元为一个名为Node的内部类,包含pre指针,next指针,和item元素,实现为双向链表
    • (3)LinkedList的删除、添加操作时间复杂度为O(1),查找时间复杂度为O(n),查找函数有一定优化,容器会先判断查找的元素是离头部较近,还是尾部较近,来决定从头部开始遍历还是尾部开始遍历
    • (4)LinkedList实现了Deque接口,因此也可以作为栈、队列和双端队列来使用。
    • (5)LinkedList可以存储null值

成员变量

  • transient int size = 0;
    transient Node first;
    transient Node last;

构造方法

public LinkedList() {
}

添加 add(e)

public boolean add(E e) {linkLast(e);return true;
}
  • 把一个元素添加到最后一个位置
  • void linkLast(E e) {
    final Node l = last;
    final Node newNode = new Node<>(l, e, null);
    last = newNode;
    if (l == null)
    first = newNode;
    else
    l.next = newNode;
    size++;
    modCount++;
    }

添加 add(index,e)

public void add(int index, E element) {checkPositionIndex(index);if (index == size)linkLast(element);elselinkBefore(element, node(index));
}
- Node<E> node(int index) {
// assert isElementIndex(index);if (index < (size >> 1)) {Node<E> x = first;for (int i = 0; i < index; i++)x = x.next;return x;
} else {Node<E> x = last;for (int i = size - 1; i > index; i--)x = x.prev;return x;
}

}

  • void linkBefore(E e, Node succ) {
    // assert succ != null;
    final Node pred = succ.prev;
    final Node newNode = new Node<>(pred, e, succ);
    succ.prev = newNode;
    if (pred == null)
    first = newNode;
    else
    pred.next = newNode;
    size++;
    modCount++;
    }

删除 remove(o)

public boolean remove(Object o) {if (o == null) {for (Node<E> x = first; x != null; x = x.next) {if (x.item == null) {unlink(x);return true;}}} else {for (Node<E> x = first; x != null; x = x.next) {if (o.equals(x.item)) {unlink(x);return true;}}}return false;
}
  • E unlink(Node x) {
    // assert x != null;
    final E element = x.item;
    final Node next = x.next;
    final Node prev = x.prev;

    if (prev == null) {
    first = next;
    } else {
    prev.next = next;
    x.prev = null;
    }

    if (next == null) {
    last = prev;
    } else {
    next.prev = prev;
    x.next = null;
    }

    x.item = null;
    size–;
    modCount++;
    return element;
    }

删除 remove(index)

public E remove(int index) {checkElementIndex(index);return unlink(node(index));
}

获取

public E get(int index) {checkElementIndex(index);return node(index).item;
}

更新

public E set(int index, E element) {checkElementIndex(index);Node<E> x = node(index);E oldVal = x.item;x.item = element;return oldVal;
}

遍历

public ListIterator<E> listIterator(int index) {checkPositionIndex(index);return new ListItr(index);
}

private class ListItr implements ListIterator<E> {private Node<E> lastReturned;private Node<E> next;private int nextIndex;private int expectedModCount = modCount;ListItr(int index) {// assert isPositionIndex(index);next = (index == size) ? null : node(index);nextIndex = index;}public boolean hasNext() {return nextIndex < size;}public E next() {checkForComodification();if (!hasNext())throw new NoSuchElementException();lastReturned = next;next = next.next;nextIndex++;return lastReturned.item;}public boolean hasPrevious() {return nextIndex > 0;}public E previous() {checkForComodification();if (!hasPrevious())throw new NoSuchElementException();lastReturned = next = (next == null) ? last : next.prev;nextIndex--;return lastReturned.item;}public int nextIndex() {return nextIndex;}public int previousIndex() {return nextIndex - 1;}public void remove() {checkForComodification();if (lastReturned == null)throw new IllegalStateException();Node<E> lastNext = lastReturned.next;unlink(lastReturned);if (next == lastReturned)next = lastNext;elsenextIndex--;lastReturned = null;expectedModCount++;}public void set(E e) {if (lastReturned == null)throw new IllegalStateException();checkForComodification();lastReturned.item = e;}public void add(E e) {checkForComodification();lastReturned = null;if (next == null)linkLast(e);elselinkBefore(e, next);nextIndex++;expectedModCount++;}public void forEachRemaining(Consumer<? super E> action) {Objects.requireNonNull(action);while (modCount == expectedModCount && nextIndex < size) {action.accept(next.item);lastReturned = next;next = next.next;nextIndex++;}checkForComodification();}final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}
}

包含

public boolean contains(Object o) {return indexOf(o) != -1;
}
public int indexOf(Object o) {int index = 0;if (o == null) {for (Node<E> x = first; x != null; x = x.next) {if (x.item == null)return index;index++;}} else {for (Node<E> x = first; x != null; x = x.next) {if (o.equals(x.item))return index;index++;}}return -1;
}

Vector

  • 基于synchronized实现的线程安全的ArrayList,但在插入元素时容量扩充的机制和ArrayList稍有不同,并可通过传入capacityIncrement来控制容量的扩充。

成员变量

  • protected Object[] elementData;
    protected int elementCount;
    protected int capacityIncrement;

构造方法

public Vector(int initialCapacity) {this(initialCapacity, 0);
}
public Vector() {this(10);
}
public Vector(int initialCapacity, int capacityIncrement) {super();if (initialCapacity < 0)throw new IllegalArgumentException("Illegal Capacity: "+initialCapacity);this.elementData = new Object[initialCapacity];this.capacityIncrement = capacityIncrement;
}

添加

public synchronized boolean add(E e) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = e;return true;
}

删除

public boolean remove(Object o) {return removeElement(o);
}
public synchronized boolean removeElement(Object obj) {modCount++;int i = indexOf(obj);if (i >= 0) {removeElementAt(i);return true;}return false;
}

扩容

private void grow(int minCapacity) {// overflow-conscious codeint oldCapacity = elementData.length;int newCapacity = oldCapacity + ((capacityIncrement > 0) ?capacityIncrement : oldCapacity);if (newCapacity - minCapacity < 0)newCapacity = minCapacity;if (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);elementData = Arrays.copyOf(elementData, newCapacity);
}

获取

public synchronized E get(int index) {if (index >= elementCount)throw new ArrayIndexOutOfBoundsException(index);return elementData(index);
}

更新

public synchronized E set(int index, E element) {if (index >= elementCount)throw new ArrayIndexOutOfBoundsException(index);E oldValue = elementData(index);elementData[index] = element;return oldValue;
}

包含

public boolean contains(Object o) {return indexOf(o, 0) >= 0;
}
public synchronized int indexOf(Object o, int index) {if (o == null) {for (int i = index ; i < elementCount ; i++)if (elementData[i]==null)return i;} else {for (int i = index ; i < elementCount ; i++)if (o.equals(elementData[i]))return i;}return -1;
}

Stack

  • 基于Vector实现,支持LIFO。

类声明

public class Stack<E> extends Vector<E> {}

push

public E push(E item) {addElement(item);return item;
}

pop

public synchronized E pop() {E  obj;int len = size();obj = peek();removeElementAt(len - 1);return obj;
}

peek

public synchronized E peek() {int len = size();if (len == 0)throw new EmptyStackException();return elementAt(len - 1);
}

CopyOnWriteArrayList

  • 是一个线程安全、并且在读操作时无锁的ArrayList。

  • 很多时候,我们的系统应对的都是读多写少的并发场景。CopyOnWriteArrayList容器允许并发读,读操作是无锁的,性能较高。至于写操作,比如向容器中添加一个元素,则首先将当前容器复制一份,然后在新副本上执行写操作,结束之后再将原容器的引用指向新容器。

  • 优点

    • 1)采用读写分离方式,读的效率非常高
    • 2)CopyOnWriteArrayList的迭代器是基于创建时的数据快照的,故数组的增删改不会影响到迭代器
  • 缺点

    • 1)内存占用高,每次执行写操作都要将原容器拷贝一份,数据量大时,对内存压力较大,可能会引起频繁GC
    • 2)只能保证数据的最终一致性,不能保证数据的实时一致性。写和读分别作用在新老不同容器上,在写操作执行过程中,读不会阻塞但读取到的却是老容器的数据。

成员变量

/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

构造方法

public CopyOnWriteArrayList() {setArray(new Object[0]);
}
  • final void setArray(Object[] a) {
    array = a;
    }

添加(有锁,锁内重新创建数组)

  • final Object[] getArray() {
    return array;
    }
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}
}

存在则添加(有锁,锁内重新创建数组)

  • 先保存一份数组snapshot,如果snapshot中存在,则直接返回。
  • 如果不存在,那么加锁,获取当前数组current,比较snapshot与current,遍历它们共同长度内的元素,如果发现current中某一个元素等于e,那么直接返回(当然current与snapshot相同就不必看了);
  • 之后再遍历current单独的部分,如果发现current中某一个元素等于e,那么直接返回;
  • 此时可以去创建一个长度+1的新数组,将e加入。
public boolean addIfAbsent(E e) {Object[] snapshot = getArray();return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] current = getArray();int len = current.length;if (snapshot != current) {// Optimize for lost race to another addXXX operationint common = Math.min(snapshot.length, len);for (int i = 0; i < common; i++)
  • //如果snapshot与current元素不同但current与e相同,那么直接返回(扫描0到common)
    if (current[i] != snapshot[i] && eq(e, current[i]))
    return false;

    • // 如果current中存在e,那么直接返回(扫描commen到len)
      if (indexOf(e, current, common, len) >= 0)
      return false;
      }
      Object[] newElements = Arrays.copyOf(current, len + 1);
      newElements[len] = e;
      setArray(newElements);
      return true;
      } finally {
      lock.unlock();
      }
      }

删除(有锁,锁内重新创建数组)

public E remove(int index) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;E oldValue = get(elements, index);int numMoved = len - index - 1;if (numMoved == 0)setArray(Arrays.copyOf(elements, len - 1));else {Object[] newElements = new Object[len - 1];System.arraycopy(elements, 0, newElements, 0, index);System.arraycopy(elements, index + 1, newElements, index,numMoved);setArray(newElements);}return oldValue;} finally {lock.unlock();}
}

获取(无锁)

public E get(int index) {return get(getArray(), index);
}
private E get(Object[] a, int index) {return (E) a[index];
}

更新(有锁,锁内重新创建数组)

public E set(int index, E element) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();E oldValue = get(elements, index);if (oldValue != element) {int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len);newElements[index] = element;setArray(newElements);} else {
  • // 为了保持“volatile”的语义,任何一个读操作都应该是一个写操作的结果,
  • 也就是读操作看到的数据一定是某个写操作的结果(尽管写操作没有改变数据本身)。
  • 所以这里即使不设置也没有问题,仅仅是为了一个语义上的补充(就如源码中的注释所言)
    // Not quite a no-op; ensures volatile write semantics
    setArray(elements);
    }
    return oldValue;
    } finally {
    lock.unlock();
    }
    }

包含(无锁)

public boolean contains(Object o) {Object[] elements = getArray();return indexOf(o, elements, 0, elements.length) >= 0;
}
private static int indexOf(Object o, Object[] elements,int index, int fence) {if (o == null) {for (int i = index; i < fence; i++)if (elements[i] == null)return i;} else {for (int i = index; i < fence; i++)if (o.equals(elements[i]))return i;}return -1;
}

遍历(遍历的是获取iterator时的数组快照)

public Iterator<E> iterator() {return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {/** Snapshot of the array */private final Object[] snapshot;/** Index of element to be returned by subsequent call to next.  */private int cursor;private COWIterator(Object[] elements, int initialCursor) {cursor = initialCursor;snapshot = elements;}public boolean hasNext() {return cursor < snapshot.length;}public boolean hasPrevious() {return cursor > 0;}@SuppressWarnings("unchecked")public E next() {if (! hasNext())throw new NoSuchElementException();return (E) snapshot[cursor++];}@SuppressWarnings("unchecked")public E previous() {if (! hasPrevious())throw new NoSuchElementException();return (E) snapshot[--cursor];}public int nextIndex() {return cursor;}public int previousIndex() {return cursor-1;}/*** Not supported. Always throws UnsupportedOperationException.* @throws UnsupportedOperationException always; {@code remove}*         is not supported by this iterator.*/public void remove() {throw new UnsupportedOperationException();}/*** Not supported. Always throws UnsupportedOperationException.* @throws UnsupportedOperationException always; {@code set}*         is not supported by this iterator.*/public void set(E e) {throw new UnsupportedOperationException();}/*** Not supported. Always throws UnsupportedOperationException.* @throws UnsupportedOperationException always; {@code add}*         is not supported by this iterator.*/public void add(E e) {throw new UnsupportedOperationException();}@Overridepublic void forEachRemaining(Consumer<? super E> action) {Objects.requireNonNull(action);Object[] elements = snapshot;final int size = elements.length;for (int i = cursor; i < size; i++) {@SuppressWarnings("unchecked") E e = (E) elements[i];action.accept(e);}cursor = size;}
}

List实现类之间的区别

- (1) 对于需要快速插入,删除元素,应该使用LinkedList。
- (2) 对于需要快速随机访问元素,应该使用ArrayList。
- (3) 对于“单线程环境” 或者 “多线程环境,但List仅仅只会被单个线程操作”,此时应该使用非同步的类(如ArrayList)。
  • 对于“多线程环境,且List可能同时被多个线程操作”,此时,应该使用同步的类(如Vector、CopyOnWriteArrayList)。

3.4 Queue

  • 先进先出”(FIFO—first in first out)的线性表

  • LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。

  • Java里有一个叫做Stack的类,却没有叫做Queue的类(它是个接口名字)。当需要使用栈时,Java已不推荐使用Stack,而是推荐使用更高效的ArrayDeque;既然Queue只是一个接口,当需要使用队列时也就首选ArrayDeque了(次选是LinkedList)。

  • Deque既可以作为栈使用,也可以作为队列使用。
    Queue Method Equivalent Deque Method 说明
    add(e) addLast(e) 向队尾插入元素,失败则抛出异常
    remove() removeFirst() 获取并删除队首元素,失败则抛出异常

element() getFirst() 获取但不删除队首元素,失败则抛出异常
offer(e) offerLast(e) 向队尾插入元素,失败则返回false
poll() pollFirst() 获取并删除队首元素,失败则返回null
peek() peekFirst() 获取但不删除队首元素,失败则返回null

Stack Method Equivalent Deque Method 说明
push(e) addFirst(e) 向栈顶插入元素,失败则抛出异常
无 offerFirst(e) 向栈顶插入元素,失败则返回false
pop() removeFirst() 获取并删除栈顶元素,失败则抛出异常
无 pollFirst() 获取并删除栈顶元素,失败则返回null
peek() peekFirst() 获取但不删除栈顶元素,失败则抛出异常
无 peekFirst() 获取但不删除栈顶元素,失败则返回null

  • ArrayDeque和LinkedList是Deque的两个通用实现。

1)ArrayDeque(底层是循环数组,有界队列)

  • head指向首端第一个有效元素,tail指向尾端第一个可以插入元素的空位。因为是循环数组,所以head不一定总等于0,tail也不一定总是比head大。

成员变量

transient Object[] elements; // non-private to simplify nested class access
transient int head;
transient int tail;
private static final int MIN_INITIAL_CAPACITY = 8;

构造方法

public ArrayDeque() {elements = new Object[16];
}
public ArrayDeque(int numElements) {allocateElements(numElements);
}
/*** Allocates empty array to hold the given number of elements.** @param numElements  the number of elements to hold*/
private void allocateElements(int numElements) {int initialCapacity = MIN_INITIAL_CAPACITY;// Find the best power of two to hold elements.// Tests "<=" because arrays aren't kept full.if (numElements >= initialCapacity) {initialCapacity = numElements;initialCapacity |= (initialCapacity >>>  1);initialCapacity |= (initialCapacity >>>  2);initialCapacity |= (initialCapacity >>>  4);initialCapacity |= (initialCapacity >>>  8);initialCapacity |= (initialCapacity >>> 16);initialCapacity++;if (initialCapacity < 0)   // Too many elements, must back offinitialCapacity >>>= 1;// Good luck allocating 2 ^ 30 elements}elements = new Object[initialCapacity];
}

扩容

/*** Doubles the capacity of this deque.  Call only when full, i.e.,* when head and tail have wrapped around to become equal.*/
private void doubleCapacity() {assert head == tail;int p = head;int n = elements.length;int r = n - p; // number of elements to the right of pint newCapacity = n << 1;if (newCapacity < 0)throw new IllegalStateException("Sorry, deque too big");Object[] a = new Object[newCapacity];System.arraycopy(elements, p, a, 0, r);System.arraycopy(elements, 0, a, r, p);elements = a;head = 0;tail = n;
}

offer

public boolean offer(E e) {return offerLast(e);
}
public boolean offerLast(E e) {addLast(e);return true;
}
public void addLast(E e) {if (e == null)throw new NullPointerException();elements[tail] = e;if ( (tail = (tail + 1) & (elements.length - 1)) == head)doubleCapacity();
}

poll

public E poll() {return pollFirst();
}
public E pollFirst() {int h = head;@SuppressWarnings("unchecked")E result = (E) elements[h];// Element is null if deque emptyif (result == null)return null;elements[h] = null;     // Must null out slothead = (h + 1) & (elements.length - 1);return result;
}

peek

public E peek() {return peekFirst();
}
public E peekFirst() {// elements[head] is null if deque emptyreturn (E) elements[head];
}

ConcurrentLinkedQueue(底层是链表,基于CAS的非阻塞队列,无界队列)

  • ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(非阻塞)来实现。

  • 1 . 使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。

    1. head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。
    1. 以批处理方式来更新head/tail,从整体上减少入队 / 出队操作的开销。
    1. ConcurrentLinkedQueue的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在一个线程在遍历队列结点而另一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就造成在遍历某个尚未被修改的结点时,在next方法返回时可以看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。
    1. 在入队时最后一个结点中的next域为null
    1. 队列中的所有未删除结点的item域不能为null且从head都可以在O(N)时间内遍历到
    1. 对于要删除的结点,不是将其引用直接置为空,而是将其的item域先置为null(迭代器在遍历是会跳过item为null的结点)
    1. 允许head和tail滞后更新,也就是上文提到的head/tail并非总是指向队列的头 / 尾节点(这主要是为了减少CAS指令执行的次数,但同时会增加volatile读的次数,但是这种消耗较小)。具体而言就是,当在队列中插入一个元素是,会检测tail和最后一个结点之间的距离是否在两个结点及以上(内部称之为hop);而在出队时,对head的检测就是与队列的第一个结点的距离是否达到两个,有则将head指向第一个结点并将head原来指向的结点的next域指向自己,这样就能断开与队列的联系从而帮助GC
  • head节点并不是总指向第一个结点,tail也并不是总指向最后一个节点。

  • 源码过于复杂,可以先跳过。

成员变量

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

构造方法

public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
  • Node#CAS操作

  • 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法的操作应该是原子的,因此提供了一种不可中断的方式更新object field。

  • 如果node的next值为cmp,则将其更新为val

  • boolean casNext(Node cmp, Node val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

  • boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
  • void lazySetNext(Node val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
    }

offer(无锁)

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never return {@code false}.** @return {@code true} (as specified by {@link Queue#offer})* @throws NullPointerException if the specified element is null*/
public boolean offer(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;
  • // q/p.next/tail.next为null,则说明p是尾节点,则插入
    if (q == null) {
    // CAS插入 p.next = newNode,多线程环境下只有一个线程可以设置成功
  • // 此时 tail.next = newNode
    if (p.casNext(null, newNode)) {
  • // CAS成功说明新节点已经放入链表
  • // 如果p不为t,说明当前线程是之前CAS失败后又重试CAS成功的,tail = newNode
    if (p != t) // hop two nodes at a time
  • casTail(t, newNode); // Failure is OK.
    return true;
    }
    // Lost CAS race to another thread; re-read next
    }
    else if (p == q)
  • //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
    // p = head , t = tail
  • p = (t != (t = tail)) ? t : head;
    else
  • // 对上一次CAS失败的线程而言,t.next/p.next/tail.next/q 不是null了
  • // 副作用是p = q,p和q都指向了尾节点,进入第三次循环
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }

poll(无锁)

public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {
  • // 保存当前节点的值
    E item = p.item;
    // 当前节点有值则CAS置为null, p.item = null
    if (item != null && p.casItem(item, null)) {

  • // CAS成功代表当前节点已经从链表中移除

  •           if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;} // 当前队列为空时则返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;} // 自引用了,则重新找新的队列头节点else if (p == q)continue restartFromHead;elsep = q;}
    

    }
    }

  • final void updateHead(Node h, Node p) {
    if (h != p && casHead(h, p))
    h.lazySetNext(h);
    }

peek(无锁)

public E peek() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}else if (p == q)continue restartFromHead;elsep = q;}}
}

size(遍历计算大小,效率低)

public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE)break;return count;
}

ConcurrentLinkedDeque(底层是双向链表,基于CAS的非阻塞队列,无界队列)

2)PriorityQueue(底层是数组,逻辑上是小顶堆,无界队列)

  • PriorityQueue底层实现的数据结构是“堆”,堆具有以下两个性质:

  • 任意一个节点的值总是不大于(最大堆)或者不小于(最小堆)其父节点的值;堆是一棵完全二叉树

    • 基于数组实现的二叉堆,对于数组中任意位置的n上元素,其左孩子在[2n+1]位置上,右孩子[2(n+1)]位置,它的父亲则在[(n-1)/2]上,而根的位置则是[0]。

    • 1)时间复杂度:remove()方法和add()方法时间复杂度为O(logn),remove(Object obj)和contains()方法需要O(n)时间复杂度,取队头则需要O(1)时间

    • 2)在初始化阶段会执行建堆函数,最终建立的是最小堆,每次出队和入队操作不能保证队列元素的有序性,只能保证队头元素和新插入元素的有序性,如果需要有序输出队列中的元素,则只要调用Arrays.sort()方法即可

    • 3)可以使用Iterator的迭代器方法输出队列中元素

    • 4)PriorityQueue是非同步的,要实现同步需要调用java.util.concurrent包下的PriorityBlockingQueue类来实现同步

    • 5)在队列中不允许使用null元素

    • 6)PriorityQueue默认是一个小顶堆,然而可以通过传入自定义的Comparator函数来实现大顶堆

  • 替代:用TreeMap复杂度太高,有没有更好的方法。hash方法,但是队列不是定长的,如果改变了大小要rehash代价太大,还有什么方法?用堆实现,那每次get put复杂度是多少(lgN)

成员变量

transient Object[] queue; // non-private to simplify nested class access/*** The number of elements in the priority queue.*/
private int size = 0;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/
private final Comparator<? super E> comparator;/*** The number of times this priority queue has been* <i>structurally modified</i>.  See AbstractList for gory details.*/
transient int modCount = 0; // non-private to simplify nested class access

构造方法

public PriorityQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}/*** Creates a {@code PriorityQueue} with the specified initial* capacity that orders its elements according to their* {@linkplain Comparable natural ordering}.** @param initialCapacity the initial capacity for this priority queue* @throws IllegalArgumentException if {@code initialCapacity} is less*         than 1*/
public PriorityQueue(int initialCapacity) {this(initialCapacity, null);
}/*** Creates a {@code PriorityQueue} with the default initial capacity and* whose elements are ordered according to the specified comparator.** @param  comparator the comparator that will be used to order this*         priority queue.  If {@code null}, the {@linkplain Comparable*         natural ordering} of the elements will be used.* @since 1.8*/
public PriorityQueue(Comparator<? super E> comparator) {this(DEFAULT_INITIAL_CAPACITY, comparator);
}/*** Creates a {@code PriorityQueue} with the specified initial capacity* that orders its elements according to the specified comparator.** @param  initialCapacity the initial capacity for this priority queue* @param  comparator the comparator that will be used to order this*         priority queue.  If {@code null}, the {@linkplain Comparable*         natural ordering} of the elements will be used.* @throws IllegalArgumentException if {@code initialCapacity} is*         less than 1*/
public PriorityQueue(int initialCapacity,Comparator<? super E> comparator) {// Note: This restriction of at least one is not actually needed,// but continues for 1.5 compatibilityif (initialCapacity < 1)throw new IllegalArgumentException();this.queue = new Object[initialCapacity];this.comparator = comparator;
}

扩容

  • Double size if small; else grow by 50%
private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ?(oldCapacity + 2) :(oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {if (minCapacity < 0) // overflowthrow new OutOfMemoryError();return (minCapacity > MAX_ARRAY_SIZE) ?Integer.MAX_VALUE :MAX_ARRAY_SIZE;
}

offer

public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)queue[0] = e;elsesiftUp(i, e);return true;
}
private void siftUp(int k, E x) {if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);
}
private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0)break;queue[k] = e;k = parent;}queue[k] = x;
}
private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0)break;queue[k] = e;k = parent;}queue[k] = key;
}

poll

public E poll() {if (size == 0)return null;int s = --size;modCount++;E result = (E) queue[0];E x = (E) queue[s];queue[s] = null;if (s != 0)siftDown(0, x);return result;
}
private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);
}
private void siftDownUsingComparator(int k, E x) {int half = size >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = queue[child];int right = child + 1;if (right < size &&comparator.compare((E) c, (E) queue[right]) > 0)c = queue[child = right];if (comparator.compare(x, (E) c) <= 0)break;queue[k] = c;k = child;}queue[k] = x;
}
private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1;        // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)c = queue[child = right];if (key.compareTo((E) c) <= 0)break;queue[k] = c;k = child;}queue[k] = key;
}

peek

public E peek() {return (size == 0) ? null : (E) queue[0];
}

3)BlockingQueue

  • 对于许多多线程问题,都可以通过使用一个或多个队列以优雅的方式将其形式化

  • 生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。

  • 比如转账

  • 一个线程将转账指令放入队列

  • 一个线程从队列中取出指令执行转账,只有这个线程可以访问银行对象的内部。因此不需要同步

  • 当试图向队列中添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞

  • 在协调多个线程之间的合作时,阻塞队列是很有用的。

  • 工作者线程可以周期性地将中间结果放入阻塞队列,其他工作者线程取出中间结果并进一步修改。队列会自动平衡负载,大概第一个线程集比第二个运行的慢,那么第二个线程集在等待结果时会阻塞,反之亦然

    • 1)LinkedBlockingQueue的容量是没有上边界的,是一个双向队列
    • 2)ArrayBlockingQueue在构造时需要指定容量,并且有一个参数来指定是否需要公平策略
    • 3)PriorityBlockingQueue是一个带优先级的队列,元素按照它们的优先级顺序被移走。该队列没有容量上限。
    • 4)DelayQueue包含实现了Delayed接口的对象
    • 5)TransferQueue接口允许生产者线程等待,直到消费者准备就绪可以接收一个元素。如果生产者调用transfer方法,那么这个调用会阻塞,直到插入的元素被消费者取出之后才停止阻塞。
  • LinkedTransferQueue类实现了这个接口

ArrayBlockingQueue(底层是数组,阻塞队列,一把锁两个Condition,有界同步队列)

  • 基于数组、先进先出、线程安全的集合类,特点是可实现指定时间的阻塞读写,并且容量是可限制的。

成员变量

/** The queued items */
final Object[] items;/** items index for next take, poll, peek or remove */
int takeIndex;/** items index for next put, offer, or add */
int putIndex;/** Number of elements in the queue */
int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */
final ReentrantLock lock;/** Condition for waiting takes */
private final Condition notEmpty;/** Condition for waiting puts */
private final Condition notFull;/*** Shared state for currently active iterators, or null if there* are known not to be any.  Allows queue operations to update* iterator state.*/
transient Itrs itrs = null;

构造方法

public ArrayBlockingQueue(int capacity) {this(capacity, false);
}/*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and the specified access policy.** @param capacity the capacity of this queue* @param fair if {@code true} then queue accesses for threads blocked*        on insertion or removal, are processed in FIFO order;*        if {@code false} the access order is unspecified.* @throws IllegalArgumentException if {@code capacity < 1}*/
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();
}

put(有锁,队列满则阻塞)

public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}
}
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();
}

take(有锁,队列空则阻塞)

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;
}

offer(有锁,最多阻塞一段时间)

public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}
}

poll(有锁,最多阻塞一段时间)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}
}

peek(有锁)

public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}
  • final E itemAt(int i) {
    return (E) items[i];
    }

遍历(构造迭代器加锁,遍历迭代器也加锁)

public Iterator<E> iterator() {return new Itr();
}
private class Itr implements Iterator<E> {/** Index to look for new nextItem; NONE at end */private int cursor;/** Element to be returned by next call to next(); null if none */private E nextItem;/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */private int nextIndex;/** Last element returned; null if none or not detached. */private E lastItem;/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */private int lastRet;/** Previous value of takeIndex, or DETACHED when detached */private int prevTakeIndex;/** Previous value of iters.cycles */private int prevCycles;/** Special index value indicating "not available" or "undefined" */private static final int NONE = -1;/*** Special index value indicating "removed elsewhere", that is,* removed by some operation other than a call to this.remove().*/private static final int REMOVED = -2;/** Special value for prevTakeIndex indicating "detached mode" */private static final int DETACHED = -3;Itr() {// assert lock.getHoldCount() == 0;lastRet = NONE;final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {if (count == 0) {// assert itrs == null;cursor = NONE;nextIndex = NONE;prevTakeIndex = DETACHED;} else {final int takeIndex = ArrayBlockingQueue.this.takeIndex;prevTakeIndex = takeIndex;nextItem = itemAt(nextIndex = takeIndex);cursor = incCursor(takeIndex);if (itrs == null) {itrs = new Itrs(this);} else {itrs.register(this); // in this orderitrs.doSomeSweeping(false);}prevCycles = itrs.cycles;// assert takeIndex >= 0;// assert prevTakeIndex == takeIndex;// assert nextIndex >= 0;// assert nextItem != null;}} finally {lock.unlock();}}
  • }

LinkedBlockingQueue(底层是链表,阻塞队列,两把锁,各自对应一个Condition,无界同步队列)

  • 另一种BlockingQueue的实现,基于链表,没有容量限制。
  • 由于出队只操作队头,入队只操作队尾,这里巧妙地使用了两把锁,对于put和offer入队操作使用一把锁,对于take和poll出队操作使用一把锁,避免了出队、入队时互相竞争锁的现象,因此LinkedBlockingQueue在高并发读写都多的情况下,性能会较ArrayBlockingQueue好很多,在遍历以及删除的情况下则要两把锁都要锁住。
  • 多CPU情况下可以在同一时刻既消费又生产。

成员变量

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/
transient Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/
private transient Node<E> last;/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

构造方法

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}/*** Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if {@code capacity} is not greater*         than zero*/
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);
}

put(加putLock锁,队列满则阻塞)


/*** Inserts the specified element at the tail of this queue, waiting if* necessary for space to become available.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {
  • // 阻塞,直至有剩余空间
    notFull.await();
    }
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
  • // 还有剩余空间时,唤醒其他生产者
    notFull.signal();
    } finally {
    putLock.unlock();
    }
    if (c == 0)
  • // c是放入当前元素之前队列的容量,现在新添加一个元素,那么唤醒消费者进行消费
  • signalNotEmpty();
    }
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;
}
private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
  • // 唤醒消费线程
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    }

take(加takeLock锁,队列空则阻塞)

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {
- // 队列空则阻塞notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)
  • // 还有元素则唤醒其他消费者
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
  • // c是消费当前元素之前队列的容量,现在的容量是c-1,可以继续放入元素,唤醒生产者进行生产
    signalNotFull();
    return x;
    }
private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
}
private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {
  • // 唤醒生产者
    notFull.signal();
    } finally {
    putLock.unlock();
    }
    }

peek(加takeLock锁)

public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}
}

remove(加两把锁)

/*** Locks to prevent both puts and takes.*/
void fullyLock() {putLock.lock();takeLock.lock();
}/*** Unlocks to allow both puts and takes.*/
void fullyUnlock() {takeLock.unlock();putLock.unlock();
}
public boolean remove(Object o) {if (o == null) return false;fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

遍历(加两把锁)

public Iterator<E> iterator() {return new Itr();
}private class Itr implements Iterator<E> {/** Basic weakly-consistent iterator.  At all times hold the next* item to hand out so that if hasNext() reports true, we will* still have it to return even if lost race with a take etc.*/private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null)currentElement = current.item;} finally {fullyUnlock();}}public boolean hasNext() {return current != null;}/*** Returns the next live successor of p, or null if no such.** Unlike other traversal methods, iterators need to handle both:* - dequeued nodes (p.next == p)* - (possibly multiple) interior removed nodes (p.item == null)*/private Node<E> nextNode(Node<E> p) {for (;;) {Node<E> s = p.next;if (s == p)return head.next;if (s == null || s.item != null)return s;p = s;}}public E next() {fullyLock();try {if (current == null)throw new NoSuchElementException();E x = currentElement;lastRet = current;current = nextNode(current);currentElement = (current == null) ? null : current.item;return x;} finally {fullyUnlock();}}public void remove() {if (lastRet == null)throw new IllegalStateException();fullyLock();try {Node<E> node = lastRet;lastRet = null;for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (p == node) {unlink(p, trail);break;}}} finally {fullyUnlock();}}
}

LinkedBlockingDeque(底层是双向链表,阻塞队列,一把锁两个Condition,无界同步队列)

  • LinkedBlockingDeque是一个基于链表的双端阻塞队列。和LinkedBlockingQueue类似,区别在于该类实现了Deque接口,而LinkedBlockingQueue实现了Queue接口。

  • LinkedBlockingDeque内部只有一把锁以及该锁上关联的两个条件,所以可以推断同一时刻只有一个线程可以在队头或者队尾执行入队或出队操作(类似于ArrayBlockingQueue)。可以发现这点和LinkedBlockingQueue不同,LinkedBlockingQueue可以同时有两个线程在两端执行操作。

  • LinkedBlockingDeque和LinkedBlockingQueue的相同点在于:

    1. 基于链表
    1. 容量可选,不设置的话,就是Int的最大值
  • 和LinkedBlockingQueue的不同点在于:

    1. 双端链表和单链表
    1. 不存在哨兵节点
    1. 一把锁+两个条件
  • LinkedBlockingDeque和ArrayBlockingQueue的相同点在于:使用一把锁+两个条件维持队列的同步。

PriorityBlockingQueue(底层是数组,出队时队空则阻塞;无界队列,不存在队满情况,一把锁一个Condition)

  • 支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。

成员变量

private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** The maximum size of array to allocate.* Some VMs reserve some header words in an array.* Attempts to allocate larger arrays may result in* OutOfMemoryError: Requested array size exceeds VM limit*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d.  The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/
private transient Object[] queue;/*** The number of elements in the priority queue.*/
private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/
private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/
private final ReentrantLock lock;/*** Condition for blocking when empty*/
private final Condition notEmpty;/*** Spinlock for allocation, acquired via CAS.*/
private transient volatile int allocationSpinLock;/*** A plain PriorityQueue used only for serialization,* to maintain compatibility with previous versions* of this class. Non-null only during serialization/deserialization.*/
private PriorityQueue<E> q;

构造方法

public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}/*** Creates a {@code PriorityBlockingQueue} with the specified* initial capacity that orders its elements according to their* {@linkplain Comparable natural ordering}.** @param initialCapacity the initial capacity for this priority queue* @throws IllegalArgumentException if {@code initialCapacity} is less*         than 1*/
public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);
}/*** Creates a {@code PriorityBlockingQueue} with the specified initial* capacity that orders its elements according to the specified* comparator.** @param initialCapacity the initial capacity for this priority queue* @param  comparator the comparator that will be used to order this*         priority queue.  If {@code null}, the {@linkplain Comparable*         natural ordering} of the elements will be used.* @throws IllegalArgumentException if {@code initialCapacity} is less*         than 1*/
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];
}

扩容(基于CAS+Lock,CAS控制创建新的数组原子执行,Lock控制数组替换原子执行)

private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}

put(有锁)

public void put(E e) {offer(e); // never need to block
}
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;
}

take(有锁)

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;
}
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;E result = (E) array[0];E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}
}

peek(有锁)

public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}
}

DelayQueue(底层是PriorityQueue,无界阻塞队列,过期元素方可移除,基于Lock)

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();
  • DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。
  • 每个元素都必须实现Delayed接口
public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}
  • getDelay方法返回对象的残留延迟,负值表示延迟结束

  • 元素只有在延迟用完的时候才能从DelayQueue移出。还必须实现Comparable接口。

  • 一个典型场景是重试机制的实现,比如当调用接口失败后,把当前调用信息放入delay=10s的元素,然后把元素放入队列,那么这个队列就是一个重试队列,一个线程通过take方法获取需要重试的接口,take返回则接口进行重试,失败则再次放入队列,同时也可以在元素加上重试次数。

成员变量

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;private final Condition available = lock.newCondition();

构造方法

public DelayQueue() {}

put

public void put(E e) {offer(e);
}
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;
  • // 通知最先等待的线程
    available.signal();
    }
    return true;
    } finally {
    lock.unlock();
    }
    }

take

  • 获取并移除队列首元素,如果队列没有过期元素则等待。

    • 第一次调用take时候由于队列空,所以调用(2)把当前线程放入available的条件队列等待,当执行offer并且添加的元素就是队首元素时候就会通知最先等待的线程激活,循环重新获取队首元素,这时候first假如不空,则调用getdelay方法看该元素海剩下多少时间就过期了,如果delay<=0则说明已经过期,则直接出队返回。否则看leader是否为null,不为null则说明是其他线程也在执行take则把该线程放入条件队列,否则是当前线程执行的take方法,则调用(5) await直到剩余过期时间到(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,该线程会重新竞争得到锁,重新进入循环。
    • (6)说明当前take返回了元素,如果当前队列还有元素则调用singal激活条件队列里面可能有的等待线程。leader那么为null,那么是第一次调用take获取过期元素的线程,第一次调用的线程调用设置等待时间的await方法等待数据过期,后面调用take的线程则调用await直到signal。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {
- // 1)获取但不移除队首元素E first = q.peek();if (first == null)
- // 2)无元素,则阻塞 available.await();else {long delay = first.getDelay(NANOSECONDS);
- // 3)有元素,且已经过期,则移除if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting
- // 4)if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();
- // 5)leader = thisThread;try {
  • // 继续阻塞延迟的时间
    available.awaitNanos(delay);
    } finally {
    if (leader == thisThread)
    leader = null;
    }
    }
    }
    }
    } finally {
    if (leader == null && q.peek() != null)
    available.signal();
    lock.unlock();
    }
    }

peek

SynchronousQueue(只存储一个元素,阻塞队列,基于CAS)

  • 实现了BlockingQueue,是一个阻塞队列。

  • 一个只存储一个元素的的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入一直处于阻塞状态,吞吐量高于LinkedBlockingQueue。

  • SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

  • // 如果为 true,则等待线程以 FIFO 的顺序竞争访问;否则顺序是未指定的。

  • // SynchronousQueue sc =new SynchronousQueue<>(true);//fair -

  • SynchronousQueue sc = new SynchronousQueue<>(); // 默认不指定的话是false,不公平的

4)TransferQueue(特殊的BlockingQueue)

  • 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)
  • 当我们不想生产者过度生产消息时,TransferQueue可能非常有用,可避免发生OutOfMemory错误。在这样的设计中,消费者的消费能力将决定生产者产生消息的速度。
public interface TransferQueue<E> extends BlockingQueue<E> {
    /**
  •  * 立即转交一个元素给消费者,如果此时队列没有消费者,那就false
    
  •  */
    
  • boolean tryTransfer(E e);
    
    /**
  •  * 转交一个元素给消费者,如果此时队列没有消费者,那就阻塞
    
  •  */
    
  • void transfer(E e) throws InterruptedException;
    
    /**
  •  * 带超时的tryTransfer
    
  •  */
    
  • boolean tryTransfer(E e, long timeout, TimeUnit unit)
    
  •     throws InterruptedException;
    
    /**
  •  * 是否有消费者等待接收数据,瞬时状态,不一定准
    
  •  */
    
  • boolean hasWaitingConsumer();
    
    /**
  •  * 返回还有多少个等待的消费者,跟上面那个一样,都是一种瞬时状态,不一定准
    
  •  */
    
  • int getWaitingConsumerCount();
    
  • }

LinkedTransferQueue(底层是链表,阻塞队列,无界同步队列)

  • LinkedTransferQueue实现了TransferQueue接口,这个接口继承了BlockingQueue。之前BlockingQueue是队列满时再入队会阻塞,而这个接口实现的功能是队列不满时也可以阻塞,实现一种有阻塞的入队功能。
  • LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

5)Queue实现类之间的区别

  • 非线程安全的:ArrayDeque、LinkedList、PriorityQueue
  • 线程安全的:ConcurrentLinkedQueue、ConcurrentLinkedDeque、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue
  • 线程安全的又分为阻塞队列和非阻塞队列,阻塞队列提供了put、take等会阻塞当前线程的方法,比如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue,也有offer、poll等阻塞一段时间候返回的方法;
  • 非阻塞队列是使用CAS机制保证offer、poll等可以线程安全地入队出队,并且不需要加锁,不会阻塞当前线程,比如ConcurrentLinkedQueue、ConcurrentLinkedDeque。

ArrayBlockingQueue和LinkedBlockingQueue 区别

    1. 队列中锁的实现不同
  • ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
    
  • LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock
    
    1. 底层实现不同
  • 前者基于数组,后者基于链表
    1. 队列边界不同
  • ArrayBlockingQueue实现的队列中必须指定队列的大小,是有界队列
    
  • LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE,是无界队列
    

【面试专栏】第三篇:Java基础:集合篇-List、Queue相关推荐

  1. 【面试专栏】第五篇:Java基础:集合篇-LinkedHashMap、ConcurrentHashMap、TreeMap

  2. JAVA中整型常量的长度,Java基础入门篇(三)——Java常量、变量,

    Java基础入门篇(三)--Java常量.变量, 一.Java常量 (一)什么是常量 常量指的是在程序中固定不变的值,是不能改变的数据.在Java中,常量包括整型常量.浮点型常量.布尔常量.字符常量等 ...

  3. 超详细的Java面试题总结(二)之Java基础知识篇

    系列文章: 超详细的Java面试题总结(一)之Java基本知识 超详细的Java面试题总结(二)之Java基础知识篇 超详细的Java面试题总结(三)之Java集合篇常见问题 超详细的Java面试题总 ...

  4. java基础—集合框架

    java基础-集合框架 JDK1.2开始引入了集合框架的概念,以弥补java中只有数组这种容器的单一问题,这些框架多数由接口构成,另外也包含了一些对于接口实现的类,其中这些接口的最上层接口为java. ...

  5. Java新手小白入门篇 Java基础(一)

    Java新手小白入门篇 Java基础 Java新手小白入门篇 Java基础(知识点体系汇总) Java新手小白入门篇 Java基础(一) Java新手小白入门篇 Java基础(二) Java新手小白入 ...

  6. Java从小白到大牛第1篇 Java基础-关东升-专题视频课程

    Java从小白到大牛第1篇 Java基础-3042人已学习 课程介绍         本视频是智捷课堂推出的一套"Java语言学习立体教程"的视频第一部分,读者以及观看群是初级小白 ...

  7. U3D_Shader编程(第二篇:基础夯实篇)

    <U3D_Shader编程> ##<U3D_Shader编程>发布说明: ++++Shader一个高大上的领域,不管怎么样,我来了. ++++立钻哥哥从2018年开始正式对Sh ...

  8. 测试开发工程师面试总结(一)——Java基础篇

    本文面向对象:测试开发工程师(服务端自动化方向). 随手百度一下都能找到**岗位面试总结,但是有关测开岗位的面试总结却寥寥无几.总体原因可能是这两个: 1 测试行业整体水平参差不齐,导致不同公司面试的 ...

  9. JAVA基础+集合+多线程+JVM

    1. Java 基础 1.1. 面向对象和面向过程的区别 面向过程性能比面向对象高. 因为类调用时需要实例化,开销比较大,比较消耗资源,所以当性能是最重要的考量因素的时候 等一般采用面向过程开发.但是 ...

最新文章

  1. 联想电脑 Realtek RTL8821CE 无线网卡 驱动安装 16.04/18.04
  2. 举例分析Linux动态库和静态库
  3. mysql 主主+ Keepalived 高可用
  4. python3.6 django部署_Centos7 django+uwsgi+nginx+python3.6.8部署
  5. 【第二期】那些设计漂亮、有创意的电路板!
  6. 【Vue】—处理边界情况
  7. 「三分钟系列05」3分钟看懂并发与并行
  8. jQuery学习笔记01
  9. 【转】使用 Element-UI 的 Loading 组件-以服务的方式调用
  10. 实现了某一个接口的匿名类的例子_“全栈2019”Java多线程第三章:创建多线程之实现Runnable接口...
  11. ABAP 出库单打印 产品 A搭A A搭B显示方式
  12. python自动交易app_股票自动交易Python下单接口
  13. 实时数据采集-免费实时数据采集软件
  14. import oracle utility_oracle executing oracle import utility,please wait终极解决方案
  15. 并行是什么意思?与并发的区别是什么?
  16. 某汽车零部件制造厂商
  17. 显示模块模式 — Revealing Module Pattern
  18. sox处理mp3_音频处理利器--SoX
  19. mysql evict_SpringBoot+Mybatis+MySQL实现读写分离
  20. 鸿蒙断更术辰东,唐家三少说出《圣墟》断更真相,这次不怪辰东,网文将大范围断更...

热门文章

  1. dell10代cpu装linux,戴尔10代cpu装win7系统及bios设置|戴尔十代cpu台式机装win7
  2. 去掉首尾字符java_Java去除字符串首尾特定字符
  3. ubuntu 进不去图形界面,如何重新安装驱动
  4. 支付二维码整合 - 三码合一支持支付宝、QQ、微信
  5. 42表盘直径是从哪测量_手表表盘尺寸怎么量
  6. 知网caj怎么转成可编辑的Word?手机可以转吗?
  7. Translatium 19.2.1 中文版 优秀的在线翻译工具
  8. ffmpeg实现画中画
  9. java 判断生日和当前时间 对比
  10. iOS播放器、Flutter高仿书旗小说、卡片动画、二维码扫码、菜单弹窗效果等源码