Java多线程系列--【JUC集合08】- LinkedBlockingQueue
参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html
概要
本章介绍JUC包中的LinkedBlockingQueue。内容包括:
LinkedBlockingQueue介绍
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue函数列表
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503458.html
LinkedBlockingQueue介绍
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue的数据结构,如下图所示:
说明:
1. LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
2. LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingQueue是通过单链表实现的。
(01) head是链表的表头。取出数据时,都是从表头head处插入。
(02) last是链表的表尾。新增数据时,都是从表尾last处插入。
(03) count是链表的实际大小,即当前链表中包含的节点个数。
(04) capacity是列表的容量,它是在创建链表时指定的。
(05) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。
LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。
此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。
-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。
关于ReentrantLock 和 Condition等更多的内容,可以参考:
(01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock
(02) Java多线程系列--“JUC锁”03之 公平锁(一)
(03) Java多线程系列--“JUC锁”04之 公平锁(二)
(04) Java多线程系列--“JUC锁”05之 非公平锁
(05) Java多线程系列--“JUC锁”06之 Condition条件
LinkedBlockingQueue函数列表
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。 LinkedBlockingQueue() // 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。 LinkedBlockingQueue(Collection<? extends E> c) // 创建一个具有给定(固定)容量的 LinkedBlockingQueue。 LinkedBlockingQueue(int capacity)// 从队列彻底移除所有元素。 void clear() // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。 int drainTo(Collection<? super E> c) // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 int drainTo(Collection<? super E> c, int maxElements) // 返回在队列中的元素上按适当顺序进行迭代的迭代器。 Iterator<E> iterator() // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 boolean offer(E e) // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。 boolean offer(E e, long timeout, TimeUnit unit) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 E poll() // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 E poll(long timeout, TimeUnit unit) // 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 void put(E e) // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。 int remainingCapacity() // 从此队列移除指定元素的单个实例(如果存在)。 boolean remove(Object o) // 返回队列中的元素个数。 int size() // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 E take() // 返回按适当顺序包含此队列中所有元素的数组。 Object[] toArray() // 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 <T> T[] toArray(T[] a) // 返回此 collection 的字符串表示形式。 String toString()
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue.java的完整源码如下:
1 /*
2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3 *
4 *
5 *
6 *
7 *
8 *
9 *
10 *
11 *
12 *
13 *
14 *
15 *
16 *
17 *
18 *
19 *
20 *
21 *
22 *
23 */
24
25 /*
26 *
27 *
28 *
29 *
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41 import java.util.AbstractQueue;
42 import java.util.Collection;
43 import java.util.Iterator;
44 import java.util.NoSuchElementException;
45
46 /**
47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
48 * linked nodes.
49 * This queue orders elements FIFO (first-in-first-out).
50 * The <em>head</em> of the queue is that element that has been on the
51 * queue the longest time.
52 * The <em>tail</em> of the queue is that element that has been on the
53 * queue the shortest time. New elements
54 * are inserted at the tail of the queue, and the queue retrieval
55 * operations obtain elements at the head of the queue.
56 * Linked queues typically have higher throughput than array-based queues but
57 * less predictable performance in most concurrent applications.
58 *
59 * <p> The optional capacity bound constructor argument serves as a
60 * way to prevent excessive queue expansion. The capacity, if unspecified,
61 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
62 * dynamically created upon each insertion unless this would bring the
63 * queue above capacity.
64 *
65 * <p>This class and its iterator implement all of the
66 * <em>optional</em> methods of the {@link Collection} and {@link
67 * Iterator} interfaces.
68 *
69 * <p>This class is a member of the
70 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
71 * Java Collections Framework</a>.
72 *
73 * @since 1.5
74 * @author Doug Lea
75 * @param <E> the type of elements held in this collection
76 *
77 */
78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
79 implements BlockingQueue<E>, java.io.Serializable {
80 private static final long serialVersionUID = -6903933977591709194L;
81
82 /*
83 * A variant of the "two lock queue" algorithm. The putLock gates
84 * entry to put (and offer), and has an associated condition for
85 * waiting puts. Similarly for the takeLock. The "count" field
86 * that they both rely on is maintained as an atomic to avoid
87 * needing to get both locks in most cases. Also, to minimize need
88 * for puts to get takeLock and vice-versa, cascading notifies are
89 * used. When a put notices that it has enabled at least one take,
90 * it signals taker. That taker in turn signals others if more
91 * items have been entered since the signal. And symmetrically for
92 * takes signalling puts. Operations such as remove(Object) and
93 * iterators acquire both locks.
94 *
95 * Visibility between writers and readers is provided as follows:
96 *
97 * Whenever an element is enqueued, the putLock is acquired and
98 * count updated. A subsequent reader guarantees visibility to the
99 * enqueued Node by either acquiring the putLock (via fullyLock)
100 * or by acquiring the takeLock, and then reading n = count.get();
101 * this gives visibility to the first n items.
102 *
103 * To implement weakly consistent iterators, it appears we need to
104 * keep all Nodes GC-reachable from a predecessor dequeued Node.
105 * That would cause two problems:
106 * - allow a rogue Iterator to cause unbounded memory retention
107 * - cause cross-generational linking of old Nodes to new Nodes if
108 * a Node was tenured while live, which generational GCs have a
109 * hard time dealing with, causing repeated major collections.
110 * However, only non-deleted Nodes need to be reachable from
111 * dequeued Nodes, and reachability does not necessarily have to
112 * be of the kind understood by the GC. We use the trick of
113 * linking a Node that has just been dequeued to itself. Such a
114 * self-link implicitly means to advance to head.next.
115 */
116
117 /**
118 * Linked list node class
119 */
120 static class Node<E> {
121 E item;
122
123 /**
124 * One of:
125 * - the real successor Node
126 * - this Node, meaning the successor is head.next
127 * - null, meaning there is no successor (this is the last node)
128 */
129 Node<E> next;
130
131 Node(E x) { item = x; }
132 }
133
134 /** The capacity bound, or Integer.MAX_VALUE if none */
135 private final int capacity;
136
137 /** Current number of elements */
138 private final AtomicInteger count = new AtomicInteger(0);
139
140 /**
141 * Head of linked list.
142 * Invariant: head.item == null
143 */
144 private transient Node<E> head;
145
146 /**
147 * Tail of linked list.
148 * Invariant: last.next == null
149 */
150 private transient Node<E> last;
151
152 /** Lock held by take, poll, etc */
153 private final ReentrantLock takeLock = new ReentrantLock();
154
155 /** Wait queue for waiting takes */
156 private final Condition notEmpty = takeLock.newCondition();
157
158 /** Lock held by put, offer, etc */
159 private final ReentrantLock putLock = new ReentrantLock();
160
161 /** Wait queue for waiting puts */
162 private final Condition notFull = putLock.newCondition();
163
164 /**
165 * Signals a waiting take. Called only from put/offer (which do not
166 * otherwise ordinarily lock takeLock.)
167 */
168 private void signalNotEmpty() {
169 final ReentrantLock takeLock = this.takeLock;
170 takeLock.lock();
171 try {
172 notEmpty.signal();
173 } finally {
174 takeLock.unlock();
175 }
176 }
177
178 /**
179 * Signals a waiting put. Called only from take/poll.
180 */
181 private void signalNotFull() {
182 final ReentrantLock putLock = this.putLock;
183 putLock.lock();
184 try {
185 notFull.signal();
186 } finally {
187 putLock.unlock();
188 }
189 }
190
191 /**
192 * Links node at end of queue.
193 *
194 * @param node the node
195 */
196 private void enqueue(Node<E> node) {
197 // assert putLock.isHeldByCurrentThread();
198 // assert last.next == null;
199 last = last.next = node;
200 }
201
202 /**
203 * Removes a node from head of queue.
204 *
205 * @return the node
206 */
207 private E dequeue() {
208 // assert takeLock.isHeldByCurrentThread();
209 // assert head.item == null;
210 Node<E> h = head;
211 Node<E> first = h.next;
212 h.next = h; // help GC
213 head = first;
214 E x = first.item;
215 first.item = null;
216 return x;
217 }
218
219 /**
220 * Lock to prevent both puts and takes.
221 */
222 void fullyLock() {
223 putLock.lock();
224 takeLock.lock();
225 }
226
227 /**
228 * Unlock to allow both puts and takes.
229 */
230 void fullyUnlock() {
231 takeLock.unlock();
232 putLock.unlock();
233 }
234
235 // /**
236 // * Tells whether both locks are held by current thread.
237 // */
238 // boolean isFullyLocked() {
239 // return (putLock.isHeldByCurrentThread() &&
240 // takeLock.isHeldByCurrentThread());
241 // }
242
243 /**
244 * Creates a {@code LinkedBlockingQueue} with a capacity of
245 * {@link Integer#MAX_VALUE}.
246 */
247 public LinkedBlockingQueue() {
248 this(Integer.MAX_VALUE);
249 }
250
251 /**
252 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
253 *
254 * @param capacity the capacity of this queue
255 * @throws IllegalArgumentException if {@code capacity} is not greater
256 * than zero
257 */
258 public LinkedBlockingQueue(int capacity) {
259 if (capacity <= 0) throw new IllegalArgumentException();
260 this.capacity = capacity;
261 last = head = new Node<E>(null);
262 }
263
264 /**
265 * Creates a {@code LinkedBlockingQueue} with a capacity of
266 * {@link Integer#MAX_VALUE}, initially containing the elements of the
267 * given collection,
268 * added in traversal order of the collection's iterator.
269 *
270 * @param c the collection of elements to initially contain
271 * @throws NullPointerException if the specified collection or any
272 * of its elements are null
273 */
274 public LinkedBlockingQueue(Collection<? extends E> c) {
275 this(Integer.MAX_VALUE);
276 final ReentrantLock putLock = this.putLock;
277 putLock.lock(); // Never contended, but necessary for visibility
278 try {
279 int n = 0;
280 for (E e : c) {
281 if (e == null)
282 throw new NullPointerException();
283 if (n == capacity)
284 throw new IllegalStateException("Queue full");
285 enqueue(new Node<E>(e));
286 ++n;
287 }
288 count.set(n);
289 } finally {
290 putLock.unlock();
291 }
292 }
293
294
295 // this doc comment is overridden to remove the reference to collections
296 // greater in size than Integer.MAX_VALUE
297 /**
298 * Returns the number of elements in this queue.
299 *
300 * @return the number of elements in this queue
301 */
302 public int size() {
303 return count.get();
304 }
305
306 // this doc comment is a modified copy of the inherited doc comment,
307 // without the reference to unlimited queues.
308 /**
309 * Returns the number of additional elements that this queue can ideally
310 * (in the absence of memory or resource constraints) accept without
311 * blocking. This is always equal to the initial capacity of this queue
312 * less the current {@code size} of this queue.
313 *
314 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
318 */
319 public int remainingCapacity() {
320 return capacity - count.get();
321 }
322
323 /**
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
326 *
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
334 int c = -1;
335 Node<E> node = new Node(e);
336 final ReentrantLock putLock = this.putLock;
337 final AtomicInteger count = this.count;
338 putLock.lockInterruptibly();
339 try {
340 /*
341 * Note that count is used in wait guard even though it is
342 * not protected by lock. This works because count can
343 * only decrease at this point (all other puts are shut
344 * out by lock), and we (or some other waiting put) are
345 * signalled if it ever changes from capacity. Similarly
346 * for all other uses of count in other wait guards.
347 */
348 while (count.get() == capacity) {
349 notFull.await();
350 }
351 enqueue(node);
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
356 putLock.unlock();
357 }
358 if (c == 0)
359 signalNotEmpty();
360 }
361
362 /**
363 * Inserts the specified element at the tail of this queue, waiting if
364 * necessary up to the specified wait time for space to become available.
365 *
366 * @return {@code true} if successful, or {@code false} if
367 * the specified waiting time elapses before space is available.
368 * @throws InterruptedException {@inheritDoc}
369 * @throws NullPointerException {@inheritDoc}
370 */
371 public boolean offer(E e, long timeout, TimeUnit unit)
372 throws InterruptedException {
373
374 if (e == null) throw new NullPointerException();
375 long nanos = unit.toNanos(timeout);
376 int c = -1;
377 final ReentrantLock putLock = this.putLock;
378 final AtomicInteger count = this.count;
379 putLock.lockInterruptibly();
380 try {
381 while (count.get() == capacity) {
382 if (nanos <= 0)
383 return false;
384 nanos = notFull.awaitNanos(nanos);
385 }
386 enqueue(new Node<E>(e));
387 c = count.getAndIncrement();
388 if (c + 1 < capacity)
389 notFull.signal();
390 } finally {
391 putLock.unlock();
392 }
393 if (c == 0)
394 signalNotEmpty();
395 return true;
396 }
397
398 /**
399 * Inserts the specified element at the tail of this queue if it is
400 * possible to do so immediately without exceeding the queue's capacity,
401 * returning {@code true} upon success and {@code false} if this queue
402 * is full.
403 * When using a capacity-restricted queue, this method is generally
404 * preferable to method {@link BlockingQueue#add add}, which can fail to
405 * insert an element only by throwing an exception.
406 *
407 * @throws NullPointerException if the specified element is null
408 */
409 public boolean offer(E e) {
410 if (e == null) throw new NullPointerException();
411 final AtomicInteger count = this.count;
412 if (count.get() == capacity)
413 return false;
414 int c = -1;
415 Node<E> node = new Node(e);
416 final ReentrantLock putLock = this.putLock;
417 putLock.lock();
418 try {
419 if (count.get() < capacity) {
420 enqueue(node);
421 c = count.getAndIncrement();
422 if (c + 1 < capacity)
423 notFull.signal();
424 }
425 } finally {
426 putLock.unlock();
427 }
428 if (c == 0)
429 signalNotEmpty();
430 return c >= 0;
431 }
432
433
434 public E take() throws InterruptedException {
435 E x;
436 int c = -1;
437 final AtomicInteger count = this.count;
438 final ReentrantLock takeLock = this.takeLock;
439 takeLock.lockInterruptibly();
440 try {
441 while (count.get() == 0) {
442 notEmpty.await();
443 }
444 x = dequeue();
445 c = count.getAndDecrement();
446 if (c > 1)
447 notEmpty.signal();
448 } finally {
449 takeLock.unlock();
450 }
451 if (c == capacity)
452 signalNotFull();
453 return x;
454 }
455
456 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457 E x = null;
458 int c = -1;
459 long nanos = unit.toNanos(timeout);
460 final AtomicInteger count = this.count;
461 final ReentrantLock takeLock = this.takeLock;
462 takeLock.lockInterruptibly();
463 try {
464 while (count.get() == 0) {
465 if (nanos <= 0)
466 return null;
467 nanos = notEmpty.awaitNanos(nanos);
468 }
469 x = dequeue();
470 c = count.getAndDecrement();
471 if (c > 1)
472 notEmpty.signal();
473 } finally {
474 takeLock.unlock();
475 }
476 if (c == capacity)
477 signalNotFull();
478 return x;
479 }
480
481 public E poll() {
482 final AtomicInteger count = this.count;
483 if (count.get() == 0)
484 return null;
485 E x = null;
486 int c = -1;
487 final ReentrantLock takeLock = this.takeLock;
488 takeLock.lock();
489 try {
490 if (count.get() > 0) {
491 x = dequeue();
492 c = count.getAndDecrement();
493 if (c > 1)
494 notEmpty.signal();
495 }
496 } finally {
497 takeLock.unlock();
498 }
499 if (c == capacity)
500 signalNotFull();
501 return x;
502 }
503
504 public E peek() {
505 if (count.get() == 0)
506 return null;
507 final ReentrantLock takeLock = this.takeLock;
508 takeLock.lock();
509 try {
510 Node<E> first = head.next;
511 if (first == null)
512 return null;
513 else
514 return first.item;
515 } finally {
516 takeLock.unlock();
517 }
518 }
519
520 /**
521 * Unlinks interior Node p with predecessor trail.
522 */
523 void unlink(Node<E> p, Node<E> trail) {
524 // assert isFullyLocked();
525 // p.next is not changed, to allow iterators that are
526 // traversing p to maintain their weak-consistency guarantee.
527 p.item = null;
528 trail.next = p.next;
529 if (last == p)
530 last = trail;
531 if (count.getAndDecrement() == capacity)
532 notFull.signal();
533 }
534
535 /**
536 * Removes a single instance of the specified element from this queue,
537 * if it is present. More formally, removes an element {@code e} such
538 * that {@code o.equals(e)}, if this queue contains one or more such
539 * elements.
540 * Returns {@code true} if this queue contained the specified element
541 * (or equivalently, if this queue changed as a result of the call).
542 *
543 * @param o element to be removed from this queue, if present
544 * @return {@code true} if this queue changed as a result of the call
545 */
546 public boolean remove(Object o) {
547 if (o == null) return false;
548 fullyLock();
549 try {
550 for (Node<E> trail = head, p = trail.next;
551 p != null;
552 trail = p, p = p.next) {
553 if (o.equals(p.item)) {
554 unlink(p, trail);
555 return true;
556 }
557 }
558 return false;
559 } finally {
560 fullyUnlock();
561 }
562 }
563
564 /**
565 * Returns {@code true} if this queue contains the specified element.
566 * More formally, returns {@code true} if and only if this queue contains
567 * at least one element {@code e} such that {@code o.equals(e)}.
568 *
569 * @param o object to be checked for containment in this queue
570 * @return {@code true} if this queue contains the specified element
571 */
572 public boolean contains(Object o) {
573 if (o == null) return false;
574 fullyLock();
575 try {
576 for (Node<E> p = head.next; p != null; p = p.next)
577 if (o.equals(p.item))
578 return true;
579 return false;
580 } finally {
581 fullyUnlock();
582 }
583 }
584
585 /**
586 * Returns an array containing all of the elements in this queue, in
587 * proper sequence.
588 *
589 * <p>The returned array will be "safe" in that no references to it are
590 * maintained by this queue. (In other words, this method must allocate
591 * a new array). The caller is thus free to modify the returned array.
592 *
593 * <p>This method acts as bridge between array-based and collection-based
594 * APIs.
595 *
596 * @return an array containing all of the elements in this queue
597 */
598 public Object[] toArray() {
599 fullyLock();
600 try {
601 int size = count.get();
602 Object[] a = new Object[size];
603 int k = 0;
604 for (Node<E> p = head.next; p != null; p = p.next)
605 a[k++] = p.item;
606 return a;
607 } finally {
608 fullyUnlock();
609 }
610 }
611
612 /**
613 * Returns an array containing all of the elements in this queue, in
614 * proper sequence; the runtime type of the returned array is that of
615 * the specified array. If the queue fits in the specified array, it
616 * is returned therein. Otherwise, a new array is allocated with the
617 * runtime type of the specified array and the size of this queue.
618 *
619 * <p>If this queue fits in the specified array with room to spare
620 * (i.e., the array has more elements than this queue), the element in
621 * the array immediately following the end of the queue is set to
622 * {@code null}.
623 *
624 * <p>Like the {@link #toArray()} method, this method acts as bridge between
625 * array-based and collection-based APIs. Further, this method allows
626 * precise control over the runtime type of the output array, and may,
627 * under certain circumstances, be used to save allocation costs.
628 *
629 * <p>Suppose {@code x} is a queue known to contain only strings.
630 * The following code can be used to dump the queue into a newly
631 * allocated array of {@code String}:
632 *
633 * <pre>
634 * String[] y = x.toArray(new String[0]);</pre>
635 *
636 * Note that {@code toArray(new Object[0])} is identical in function to
637 * {@code toArray()}.
638 *
639 * @param a the array into which the elements of the queue are to
640 * be stored, if it is big enough; otherwise, a new array of the
641 * same runtime type is allocated for this purpose
642 * @return an array containing all of the elements in this queue
643 * @throws ArrayStoreException if the runtime type of the specified array
644 * is not a supertype of the runtime type of every element in
645 * this queue
646 * @throws NullPointerException if the specified array is null
647 */
648 @SuppressWarnings("unchecked")
649 public <T> T[] toArray(T[] a) {
650 fullyLock();
651 try {
652 int size = count.get();
653 if (a.length < size)
654 a = (T[])java.lang.reflect.Array.newInstance
655 (a.getClass().getComponentType(), size);
656
657 int k = 0;
658 for (Node<E> p = head.next; p != null; p = p.next)
659 a[k++] = (T)p.item;
660 if (a.length > k)
661 a[k] = null;
662 return a;
663 } finally {
664 fullyUnlock();
665 }
666 }
667
668 public String toString() {
669 fullyLock();
670 try {
671 Node<E> p = head.next;
672 if (p == null)
673 return "[]";
674
675 StringBuilder sb = new StringBuilder();
676 sb.append('[');
677 for (;;) {
678 E e = p.item;
679 sb.append(e == this ? "(this Collection)" : e);
680 p = p.next;
681 if (p == null)
682 return sb.append(']').toString();
683 sb.append(',').append(' ');
684 }
685 } finally {
686 fullyUnlock();
687 }
688 }
689
690 /**
691 * Atomically removes all of the elements from this queue.
692 * The queue will be empty after this call returns.
693 */
694 public void clear() {
695 fullyLock();
696 try {
697 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
698 h.next = h;
699 p.item = null;
700 }
701 head = last;
702 // assert head.item == null && head.next == null;
703 if (count.getAndSet(0) == capacity)
704 notFull.signal();
705 } finally {
706 fullyUnlock();
707 }
708 }
709
710 /**
711 * @throws UnsupportedOperationException {@inheritDoc}
712 * @throws ClassCastException {@inheritDoc}
713 * @throws NullPointerException {@inheritDoc}
714 * @throws IllegalArgumentException {@inheritDoc}
715 */
716 public int drainTo(Collection<? super E> c) {
717 return drainTo(c, Integer.MAX_VALUE);
718 }
719
720 /**
721 * @throws UnsupportedOperationException {@inheritDoc}
722 * @throws ClassCastException {@inheritDoc}
723 * @throws NullPointerException {@inheritDoc}
724 * @throws IllegalArgumentException {@inheritDoc}
725 */
726 public int drainTo(Collection<? super E> c, int maxElements) {
727 if (c == null)
728 throw new NullPointerException();
729 if (c == this)
730 throw new IllegalArgumentException();
731 boolean signalNotFull = false;
732 final ReentrantLock takeLock = this.takeLock;
733 takeLock.lock();
734 try {
735 int n = Math.min(maxElements, count.get());
736 // count.get provides visibility to first n Nodes
737 Node<E> h = head;
738 int i = 0;
739 try {
740 while (i < n) {
741 Node<E> p = h.next;
742 c.add(p.item);
743 p.item = null;
744 h.next = h;
745 h = p;
746 ++i;
747 }
748 return n;
749 } finally {
750 // Restore invariants even if c.add() threw
751 if (i > 0) {
752 // assert h.item == null;
753 head = h;
754 signalNotFull = (count.getAndAdd(-i) == capacity);
755 }
756 }
757 } finally {
758 takeLock.unlock();
759 if (signalNotFull)
760 signalNotFull();
761 }
762 }
763
764 /**
765 * Returns an iterator over the elements in this queue in proper sequence.
766 * The elements will be returned in order from first (head) to last (tail).
767 *
768 * <p>The returned iterator is a "weakly consistent" iterator that
769 * will never throw {@link java.util.ConcurrentModificationException
770 * ConcurrentModificationException}, and guarantees to traverse
771 * elements as they existed upon construction of the iterator, and
772 * may (but is not guaranteed to) reflect any modifications
773 * subsequent to construction.
774 *
775 * @return an iterator over the elements in this queue in proper sequence
776 */
777 public Iterator<E> iterator() {
778 return new Itr();
779 }
780
781 private class Itr implements Iterator<E> {
782 /*
783 * Basic weakly-consistent iterator. At all times hold the next
784 * item to hand out so that if hasNext() reports true, we will
785 * still have it to return even if lost race with a take etc.
786 */
787 private Node<E> current;
788 private Node<E> lastRet;
789 private E currentElement;
790
791 Itr() {
792 fullyLock();
793 try {
794 current = head.next;
795 if (current != null)
796 currentElement = current.item;
797 } finally {
798 fullyUnlock();
799 }
800 }
801
802 public boolean hasNext() {
803 return current != null;
804 }
805
806 /**
807 * Returns the next live successor of p, or null if no such.
808 *
809 * Unlike other traversal methods, iterators need to handle both:
810 * - dequeued nodes (p.next == p)
811 * - (possibly multiple) interior removed nodes (p.item == null)
812 */
813 private Node<E> nextNode(Node<E> p) {
814 for (;;) {
815 Node<E> s = p.next;
816 if (s == p)
817 return head.next;
818 if (s == null || s.item != null)
819 return s;
820 p = s;
821 }
822 }
823
824 public E next() {
825 fullyLock();
826 try {
827 if (current == null)
828 throw new NoSuchElementException();
829 E x = currentElement;
830 lastRet = current;
831 current = nextNode(current);
832 currentElement = (current == null) ? null : current.item;
833 return x;
834 } finally {
835 fullyUnlock();
836 }
837 }
838
839 public void remove() {
840 if (lastRet == null)
841 throw new IllegalStateException();
842 fullyLock();
843 try {
844 Node<E> node = lastRet;
845 lastRet = null;
846 for (Node<E> trail = head, p = trail.next;
847 p != null;
848 trail = p, p = p.next) {
849 if (p == node) {
850 unlink(p, trail);
851 break;
852 }
853 }
854 } finally {
855 fullyUnlock();
856 }
857 }
858 }
859
860 /**
861 * Save the state to a stream (that is, serialize it).
862 *
863 * @serialData The capacity is emitted (int), followed by all of
864 * its elements (each an {@code Object}) in the proper order,
865 * followed by a null
866 * @param s the stream
867 */
868 private void writeObject(java.io.ObjectOutputStream s)
869 throws java.io.IOException {
870
871 fullyLock();
872 try {
873 // Write out any hidden stuff, plus capacity
874 s.defaultWriteObject();
875
876 // Write out all elements in the proper order.
877 for (Node<E> p = head.next; p != null; p = p.next)
878 s.writeObject(p.item);
879
880 // Use trailing null as sentinel
881 s.writeObject(null);
882 } finally {
883 fullyUnlock();
884 }
885 }
886
887 /**
888 * Reconstitute this queue instance from a stream (that is,
889 * deserialize it).
890 *
891 * @param s the stream
892 */
893 private void readObject(java.io.ObjectInputStream s)
894 throws java.io.IOException, ClassNotFoundException {
895 // Read in capacity, and any hidden stuff
896 s.defaultReadObject();
897
898 count.set(0);
899 last = head = new Node<E>(null);
900
901 // Read in all elements and place in queue
902 for (;;) {
903 @SuppressWarnings("unchecked")
904 E item = (E)s.readObject();
905 if (item == null)
906 break;
907 add(item);
908 }
909 }
910 }
下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。
1. 创建
下面以LinkedBlockingQueue(int capacity)来进行说明。
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); }
说明:
(01) capacity是“链式阻塞队列”的容量。
(02) head和last是“链式阻塞队列”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:
// 容量 private final int capacity; // 当前数量 private final AtomicInteger count = new AtomicInteger(0); private transient Node<E> head; // 链表的表头 private transient Node<E> last; // 链表的表尾 // 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmpty private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
链表的节点定义如下:
static class Node<E> {E item; // 数据Node<E> next; // 下一个节点的指针 Node(E x) { item = x; } }
2. 添加
下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。
public boolean offer(E e) {if (e == null) throw new NullPointerException();// 如果“队列已满”,则返回false,表示插入失败。final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;// 新建“节点e”Node<E> node = new Node(e);final ReentrantLock putLock = this.putLock;// 获取“插入锁putLock” putLock.lock();try {// 再次对“队列是不是满”的进行判断。// 若“队列未满”,则插入节点。if (count.get() < capacity) {// 插入节点 enqueue(node);// 将“当前节点数量”+1,并返回“原始的数量”c = count.getAndIncrement();// 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。if (c + 1 < capacity)notFull.signal();}} finally {// 释放“插入锁putLock” putLock.unlock();}// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程if (c == 0)signalNotEmpty();return c >= 0; }
说明:offer()的作用很简单,就是将元素E添加到队列的末尾。
enqueue()的源码如下:
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node; }
enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!
signalNotEmpty()的源码如下:
private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();} }
signalNotEmpty()的作用是唤醒notEmpty上的等待线程。
3. 取出
下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常 takeLock.lockInterruptibly();try {// 若“队列为空”,则一直等待。while (count.get() == 0) {notEmpty.await();}// 取出元素x = dequeue();// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {// 释放“取出锁” takeLock.unlock();}// 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。if (c == capacity)signalNotFull();return x; }
说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。
dequeue()的源码如下:
private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x; }
dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。
signalNotFull()的源码如下:
private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();} }
signalNotFull()的作用就是唤醒notFull上的等待线程。
4. 遍历
下面对LinkedBlockingQueue的遍历方法进行说明。
public Iterator<E> iterator() {return new Itr(); }
iterator()实际上是返回一个Iter对象。
Itr类的定义如下:
private class Itr implements Iterator<E> {// 当前节点private Node<E> current;// 上一次返回的节点private Node<E> lastRet;// 当前节点对应的值private E currentElement;Itr() {// 同时获取“插入锁putLock” 和 “取出锁takeLock” fullyLock();try {// 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点current = head.next;if (current != null)currentElement = current.item;} finally {// 释放“插入锁putLock” 和 “取出锁takeLock” fullyUnlock();}}// 返回“下一个节点是否为null”public boolean hasNext() {return current != null;}private Node<E> nextNode(Node<E> p) {for (;;) {Node<E> s = p.next;if (s == p)return head.next;if (s == null || s.item != null)return s;p = s;}}// 返回下一个节点public E next() {fullyLock();try {if (current == null)throw new NoSuchElementException();E x = currentElement;lastRet = current;current = nextNode(current);currentElement = (current == null) ? null : current.item;return x;} finally {fullyUnlock();}}// 删除下一个节点public void remove() {if (lastRet == null)throw new IllegalStateException();fullyLock();try {Node<E> node = lastRet;lastRet = null;for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (p == node) {unlink(p, trail);break;}}} finally {fullyUnlock();}} }
LinkedBlockingQueue示例
1 import java.util.*; 2 import java.util.concurrent.*; 3 4 /* 5 * LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。 6 * 7 * 下面是“多个线程同时操作并且遍历queue”的示例 8 * (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。 9 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 10 * 11 * @author skywang 12 */ 13 public class LinkedBlockingQueueDemo1 { 14 15 // TODO: queue是LinkedList对象时,程序会出错。 16 //private static Queue<String> queue = new LinkedList<String>(); 17 private static Queue<String> queue = new LinkedBlockingQueue<String>(); 18 public static void main(String[] args) { 19 20 // 同时启动两个线程对queue进行操作! 21 new MyThread("ta").start(); 22 new MyThread("tb").start(); 23 } 24 25 private static void printAll() { 26 String value; 27 Iterator iter = queue.iterator(); 28 while(iter.hasNext()) { 29 value = (String)iter.next(); 30 System.out.print(value+", "); 31 } 32 System.out.println(); 33 } 34 35 private static class MyThread extends Thread { 36 MyThread(String name) { 37 super(name); 38 } 39 @Override 40 public void run() { 41 int i = 0; 42 while (i++ < 6) { 43 // “线程名” + "-" + "序号" 44 String val = Thread.currentThread().getName()+i; 45 queue.add(val); 46 // 通过“Iterator”遍历queue。 47 printAll(); 48 } 49 } 50 } 51 }
(某一次)运行结果:
tb1, ta1, tb1, ta1, ta2, tb1, ta1, ta2, ta3, tb1, ta1, ta2, ta3, ta4, tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, ta4, tb1, ta5, ta1, ta6, ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, ta5, ta6, tb2, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,
结果说明:
示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。
概要
本章介绍JUC包中的LinkedBlockingQueue。内容包括:
LinkedBlockingQueue介绍
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue函数列表
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503458.html
LinkedBlockingQueue介绍
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue的数据结构,如下图所示:
说明:
1. LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
2. LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingQueue是通过单链表实现的。
(01) head是链表的表头。取出数据时,都是从表头head处插入。
(02) last是链表的表尾。新增数据时,都是从表尾last处插入。
(03) count是链表的实际大小,即当前链表中包含的节点个数。
(04) capacity是列表的容量,它是在创建链表时指定的。
(05) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。
LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。
此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。
-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。
关于ReentrantLock 和 Condition等更多的内容,可以参考:
(01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock
(02) Java多线程系列--“JUC锁”03之 公平锁(一)
(03) Java多线程系列--“JUC锁”04之 公平锁(二)
(04) Java多线程系列--“JUC锁”05之 非公平锁
(05) Java多线程系列--“JUC锁”06之 Condition条件
LinkedBlockingQueue函数列表
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。 LinkedBlockingQueue() // 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。 LinkedBlockingQueue(Collection<? extends E> c) // 创建一个具有给定(固定)容量的 LinkedBlockingQueue。 LinkedBlockingQueue(int capacity)// 从队列彻底移除所有元素。 void clear() // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。 int drainTo(Collection<? super E> c) // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 int drainTo(Collection<? super E> c, int maxElements) // 返回在队列中的元素上按适当顺序进行迭代的迭代器。 Iterator<E> iterator() // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 boolean offer(E e) // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。 boolean offer(E e, long timeout, TimeUnit unit) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 E poll() // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 E poll(long timeout, TimeUnit unit) // 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 void put(E e) // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。 int remainingCapacity() // 从此队列移除指定元素的单个实例(如果存在)。 boolean remove(Object o) // 返回队列中的元素个数。 int size() // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 E take() // 返回按适当顺序包含此队列中所有元素的数组。 Object[] toArray() // 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 <T> T[] toArray(T[] a) // 返回此 collection 的字符串表示形式。 String toString()
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue.java的完整源码如下:
1 /* 2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. 3 * 4 * 5 * 6 * 7 * 8 * 9 * 10 * 11 * 12 * 13 * 14 * 15 * 16 * 17 * 18 * 19 * 20 * 21 * 22 * 23 */ 24 25 /* 26 * 27 * 28 * 29 * 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.atomic.AtomicInteger; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.locks.ReentrantLock; 41 import java.util.AbstractQueue; 42 import java.util.Collection; 43 import java.util.Iterator; 44 import java.util.NoSuchElementException; 45 46 /** 47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 48 * linked nodes. 49 * This queue orders elements FIFO (first-in-first-out). 50 * The <em>head</em> of the queue is that element that has been on the 51 * queue the longest time. 52 * The <em>tail</em> of the queue is that element that has been on the 53 * queue the shortest time. New elements 54 * are inserted at the tail of the queue, and the queue retrieval 55 * operations obtain elements at the head of the queue. 56 * Linked queues typically have higher throughput than array-based queues but 57 * less predictable performance in most concurrent applications. 58 * 59 * <p> The optional capacity bound constructor argument serves as a 60 * way to prevent excessive queue expansion. The capacity, if unspecified, 61 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 62 * dynamically created upon each insertion unless this would bring the 63 * queue above capacity. 64 * 65 * <p>This class and its iterator implement all of the 66 * <em>optional</em> methods of the {@link Collection} and {@link 67 * Iterator} interfaces. 68 * 69 * <p>This class is a member of the 70 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 71 * Java Collections Framework</a>. 72 * 73 * @since 1.5 74 * @author Doug Lea 75 * @param <E> the type of elements held in this collection 76 * 77 */ 78 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 79 implements BlockingQueue<E>, java.io.Serializable { 80 private static final long serialVersionUID = -6903933977591709194L; 81 82 /* 83 * A variant of the "two lock queue" algorithm. The putLock gates 84 * entry to put (and offer), and has an associated condition for 85 * waiting puts. Similarly for the takeLock. The "count" field 86 * that they both rely on is maintained as an atomic to avoid 87 * needing to get both locks in most cases. Also, to minimize need 88 * for puts to get takeLock and vice-versa, cascading notifies are 89 * used. When a put notices that it has enabled at least one take, 90 * it signals taker. That taker in turn signals others if more 91 * items have been entered since the signal. And symmetrically for 92 * takes signalling puts. Operations such as remove(Object) and 93 * iterators acquire both locks. 94 * 95 * Visibility between writers and readers is provided as follows: 96 * 97 * Whenever an element is enqueued, the putLock is acquired and 98 * count updated. A subsequent reader guarantees visibility to the 99 * enqueued Node by either acquiring the putLock (via fullyLock) 100 * or by acquiring the takeLock, and then reading n = count.get(); 101 * this gives visibility to the first n items. 102 * 103 * To implement weakly consistent iterators, it appears we need to 104 * keep all Nodes GC-reachable from a predecessor dequeued Node. 105 * That would cause two problems: 106 * - allow a rogue Iterator to cause unbounded memory retention 107 * - cause cross-generational linking of old Nodes to new Nodes if 108 * a Node was tenured while live, which generational GCs have a 109 * hard time dealing with, causing repeated major collections. 110 * However, only non-deleted Nodes need to be reachable from 111 * dequeued Nodes, and reachability does not necessarily have to 112 * be of the kind understood by the GC. We use the trick of 113 * linking a Node that has just been dequeued to itself. Such a 114 * self-link implicitly means to advance to head.next. 115 */ 116 117 /** 118 * Linked list node class 119 */ 120 static class Node<E> { 121 E item; 122 123 /** 124 * One of: 125 * - the real successor Node 126 * - this Node, meaning the successor is head.next 127 * - null, meaning there is no successor (this is the last node) 128 */ 129 Node<E> next; 130 131 Node(E x) { item = x; } 132 } 133 134 /** The capacity bound, or Integer.MAX_VALUE if none */ 135 private final int capacity; 136 137 /** Current number of elements */ 138 private final AtomicInteger count = new AtomicInteger(0); 139 140 /** 141 * Head of linked list. 142 * Invariant: head.item == null 143 */ 144 private transient Node<E> head; 145 146 /** 147 * Tail of linked list. 148 * Invariant: last.next == null 149 */ 150 private transient Node<E> last; 151 152 /** Lock held by take, poll, etc */ 153 private final ReentrantLock takeLock = new ReentrantLock(); 154 155 /** Wait queue for waiting takes */ 156 private final Condition notEmpty = takeLock.newCondition(); 157 158 /** Lock held by put, offer, etc */ 159 private final ReentrantLock putLock = new ReentrantLock(); 160 161 /** Wait queue for waiting puts */ 162 private final Condition notFull = putLock.newCondition(); 163 164 /** 165 * Signals a waiting take. Called only from put/offer (which do not 166 * otherwise ordinarily lock takeLock.) 167 */ 168 private void signalNotEmpty() { 169 final ReentrantLock takeLock = this.takeLock; 170 takeLock.lock(); 171 try { 172 notEmpty.signal(); 173 } finally { 174 takeLock.unlock(); 175 } 176 } 177 178 /** 179 * Signals a waiting put. Called only from take/poll. 180 */ 181 private void signalNotFull() { 182 final ReentrantLock putLock = this.putLock; 183 putLock.lock(); 184 try { 185 notFull.signal(); 186 } finally { 187 putLock.unlock(); 188 } 189 } 190 191 /** 192 * Links node at end of queue. 193 * 194 * @param node the node 195 */ 196 private void enqueue(Node<E> node) { 197 // assert putLock.isHeldByCurrentThread(); 198 // assert last.next == null; 199 last = last.next = node; 200 } 201 202 /** 203 * Removes a node from head of queue. 204 * 205 * @return the node 206 */ 207 private E dequeue() { 208 // assert takeLock.isHeldByCurrentThread(); 209 // assert head.item == null; 210 Node<E> h = head; 211 Node<E> first = h.next; 212 h.next = h; // help GC 213 head = first; 214 E x = first.item; 215 first.item = null; 216 return x; 217 } 218 219 /** 220 * Lock to prevent both puts and takes. 221 */ 222 void fullyLock() { 223 putLock.lock(); 224 takeLock.lock(); 225 } 226 227 /** 228 * Unlock to allow both puts and takes. 229 */ 230 void fullyUnlock() { 231 takeLock.unlock(); 232 putLock.unlock(); 233 } 234 235 // /** 236 // * Tells whether both locks are held by current thread. 237 // */ 238 // boolean isFullyLocked() { 239 // return (putLock.isHeldByCurrentThread() && 240 // takeLock.isHeldByCurrentThread()); 241 // } 242 243 /** 244 * Creates a {@code LinkedBlockingQueue} with a capacity of 245 * {@link Integer#MAX_VALUE}. 246 */ 247 public LinkedBlockingQueue() { 248 this(Integer.MAX_VALUE); 249 } 250 251 /** 252 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 253 * 254 * @param capacity the capacity of this queue 255 * @throws IllegalArgumentException if {@code capacity} is not greater 256 * than zero 257 */ 258 public LinkedBlockingQueue(int capacity) { 259 if (capacity <= 0) throw new IllegalArgumentException(); 260 this.capacity = capacity; 261 last = head = new Node<E>(null); 262 } 263 264 /** 265 * Creates a {@code LinkedBlockingQueue} with a capacity of 266 * {@link Integer#MAX_VALUE}, initially containing the elements of the 267 * given collection, 268 * added in traversal order of the collection's iterator. 269 * 270 * @param c the collection of elements to initially contain 271 * @throws NullPointerException if the specified collection or any 272 * of its elements are null 273 */ 274 public LinkedBlockingQueue(Collection<? extends E> c) { 275 this(Integer.MAX_VALUE); 276 final ReentrantLock putLock = this.putLock; 277 putLock.lock(); // Never contended, but necessary for visibility 278 try { 279 int n = 0; 280 for (E e : c) { 281 if (e == null) 282 throw new NullPointerException(); 283 if (n == capacity) 284 throw new IllegalStateException("Queue full"); 285 enqueue(new Node<E>(e)); 286 ++n; 287 } 288 count.set(n); 289 } finally { 290 putLock.unlock(); 291 } 292 } 293 294 295 // this doc comment is overridden to remove the reference to collections 296 // greater in size than Integer.MAX_VALUE 297 /** 298 * Returns the number of elements in this queue. 299 * 300 * @return the number of elements in this queue 301 */ 302 public int size() { 303 return count.get(); 304 } 305 306 // this doc comment is a modified copy of the inherited doc comment, 307 // without the reference to unlimited queues. 308 /** 309 * Returns the number of additional elements that this queue can ideally 310 * (in the absence of memory or resource constraints) accept without 311 * blocking. This is always equal to the initial capacity of this queue 312 * less the current {@code size} of this queue. 313 * 314 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 315 * an element will succeed by inspecting {@code remainingCapacity} 316 * because it may be the case that another thread is about to 317 * insert or remove an element. 318 */ 319 public int remainingCapacity() { 320 return capacity - count.get(); 321 } 322 323 /** 324 * Inserts the specified element at the tail of this queue, waiting if 325 * necessary for space to become available. 326 * 327 * @throws InterruptedException {@inheritDoc} 328 * @throws NullPointerException {@inheritDoc} 329 */ 330 public void put(E e) throws InterruptedException { 331 if (e == null) throw new NullPointerException(); 332 // Note: convention in all put/take/etc is to preset local var 333 // holding count negative to indicate failure unless set. 334 int c = -1; 335 Node<E> node = new Node(e); 336 final ReentrantLock putLock = this.putLock; 337 final AtomicInteger count = this.count; 338 putLock.lockInterruptibly(); 339 try { 340 /* 341 * Note that count is used in wait guard even though it is 342 * not protected by lock. This works because count can 343 * only decrease at this point (all other puts are shut 344 * out by lock), and we (or some other waiting put) are 345 * signalled if it ever changes from capacity. Similarly 346 * for all other uses of count in other wait guards. 347 */ 348 while (count.get() == capacity) { 349 notFull.await(); 350 } 351 enqueue(node); 352 c = count.getAndIncrement(); 353 if (c + 1 < capacity) 354 notFull.signal(); 355 } finally { 356 putLock.unlock(); 357 } 358 if (c == 0) 359 signalNotEmpty(); 360 } 361 362 /** 363 * Inserts the specified element at the tail of this queue, waiting if 364 * necessary up to the specified wait time for space to become available. 365 * 366 * @return {@code true} if successful, or {@code false} if 367 * the specified waiting time elapses before space is available. 368 * @throws InterruptedException {@inheritDoc} 369 * @throws NullPointerException {@inheritDoc} 370 */ 371 public boolean offer(E e, long timeout, TimeUnit unit) 372 throws InterruptedException { 373 374 if (e == null) throw new NullPointerException(); 375 long nanos = unit.toNanos(timeout); 376 int c = -1; 377 final ReentrantLock putLock = this.putLock; 378 final AtomicInteger count = this.count; 379 putLock.lockInterruptibly(); 380 try { 381 while (count.get() == capacity) { 382 if (nanos <= 0) 383 return false; 384 nanos = notFull.awaitNanos(nanos); 385 } 386 enqueue(new Node<E>(e)); 387 c = count.getAndIncrement(); 388 if (c + 1 < capacity) 389 notFull.signal(); 390 } finally { 391 putLock.unlock(); 392 } 393 if (c == 0) 394 signalNotEmpty(); 395 return true; 396 } 397 398 /** 399 * Inserts the specified element at the tail of this queue if it is 400 * possible to do so immediately without exceeding the queue's capacity, 401 * returning {@code true} upon success and {@code false} if this queue 402 * is full. 403 * When using a capacity-restricted queue, this method is generally 404 * preferable to method {@link BlockingQueue#add add}, which can fail to 405 * insert an element only by throwing an exception. 406 * 407 * @throws NullPointerException if the specified element is null 408 */ 409 public boolean offer(E e) { 410 if (e == null) throw new NullPointerException(); 411 final AtomicInteger count = this.count; 412 if (count.get() == capacity) 413 return false; 414 int c = -1; 415 Node<E> node = new Node(e); 416 final ReentrantLock putLock = this.putLock; 417 putLock.lock(); 418 try { 419 if (count.get() < capacity) { 420 enqueue(node); 421 c = count.getAndIncrement(); 422 if (c + 1 < capacity) 423 notFull.signal(); 424 } 425 } finally { 426 putLock.unlock(); 427 } 428 if (c == 0) 429 signalNotEmpty(); 430 return c >= 0; 431 } 432 433 434 public E take() throws InterruptedException { 435 E x; 436 int c = -1; 437 final AtomicInteger count = this.count; 438 final ReentrantLock takeLock = this.takeLock; 439 takeLock.lockInterruptibly(); 440 try { 441 while (count.get() == 0) { 442 notEmpty.await(); 443 } 444 x = dequeue(); 445 c = count.getAndDecrement(); 446 if (c > 1) 447 notEmpty.signal(); 448 } finally { 449 takeLock.unlock(); 450 } 451 if (c == capacity) 452 signalNotFull(); 453 return x; 454 } 455 456 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 457 E x = null; 458 int c = -1; 459 long nanos = unit.toNanos(timeout); 460 final AtomicInteger count = this.count; 461 final ReentrantLock takeLock = this.takeLock; 462 takeLock.lockInterruptibly(); 463 try { 464 while (count.get() == 0) { 465 if (nanos <= 0) 466 return null; 467 nanos = notEmpty.awaitNanos(nanos); 468 } 469 x = dequeue(); 470 c = count.getAndDecrement(); 471 if (c > 1) 472 notEmpty.signal(); 473 } finally { 474 takeLock.unlock(); 475 } 476 if (c == capacity) 477 signalNotFull(); 478 return x; 479 } 480 481 public E poll() { 482 final AtomicInteger count = this.count; 483 if (count.get() == 0) 484 return null; 485 E x = null; 486 int c = -1; 487 final ReentrantLock takeLock = this.takeLock; 488 takeLock.lock(); 489 try { 490 if (count.get() > 0) { 491 x = dequeue(); 492 c = count.getAndDecrement(); 493 if (c > 1) 494 notEmpty.signal(); 495 } 496 } finally { 497 takeLock.unlock(); 498 } 499 if (c == capacity) 500 signalNotFull(); 501 return x; 502 } 503 504 public E peek() { 505 if (count.get() == 0) 506 return null; 507 final ReentrantLock takeLock = this.takeLock; 508 takeLock.lock(); 509 try { 510 Node<E> first = head.next; 511 if (first == null) 512 return null; 513 else 514 return first.item; 515 } finally { 516 takeLock.unlock(); 517 } 518 } 519 520 /** 521 * Unlinks interior Node p with predecessor trail. 522 */ 523 void unlink(Node<E> p, Node<E> trail) { 524 // assert isFullyLocked(); 525 // p.next is not changed, to allow iterators that are 526 // traversing p to maintain their weak-consistency guarantee. 527 p.item = null; 528 trail.next = p.next; 529 if (last == p) 530 last = trail; 531 if (count.getAndDecrement() == capacity) 532 notFull.signal(); 533 } 534 535 /** 536 * Removes a single instance of the specified element from this queue, 537 * if it is present. More formally, removes an element {@code e} such 538 * that {@code o.equals(e)}, if this queue contains one or more such 539 * elements. 540 * Returns {@code true} if this queue contained the specified element 541 * (or equivalently, if this queue changed as a result of the call). 542 * 543 * @param o element to be removed from this queue, if present 544 * @return {@code true} if this queue changed as a result of the call 545 */ 546 public boolean remove(Object o) { 547 if (o == null) return false; 548 fullyLock(); 549 try { 550 for (Node<E> trail = head, p = trail.next; 551 p != null; 552 trail = p, p = p.next) { 553 if (o.equals(p.item)) { 554 unlink(p, trail); 555 return true; 556 } 557 } 558 return false; 559 } finally { 560 fullyUnlock(); 561 } 562 } 563 564 /** 565 * Returns {@code true} if this queue contains the specified element. 566 * More formally, returns {@code true} if and only if this queue contains 567 * at least one element {@code e} such that {@code o.equals(e)}. 568 * 569 * @param o object to be checked for containment in this queue 570 * @return {@code true} if this queue contains the specified element 571 */ 572 public boolean contains(Object o) { 573 if (o == null) return false; 574 fullyLock(); 575 try { 576 for (Node<E> p = head.next; p != null; p = p.next) 577 if (o.equals(p.item)) 578 return true; 579 return false; 580 } finally { 581 fullyUnlock(); 582 } 583 } 584 585 /** 586 * Returns an array containing all of the elements in this queue, in 587 * proper sequence. 588 * 589 * <p>The returned array will be "safe" in that no references to it are 590 * maintained by this queue. (In other words, this method must allocate 591 * a new array). The caller is thus free to modify the returned array. 592 * 593 * <p>This method acts as bridge between array-based and collection-based 594 * APIs. 595 * 596 * @return an array containing all of the elements in this queue 597 */ 598 public Object[] toArray() { 599 fullyLock(); 600 try { 601 int size = count.get(); 602 Object[] a = new Object[size]; 603 int k = 0; 604 for (Node<E> p = head.next; p != null; p = p.next) 605 a[k++] = p.item; 606 return a; 607 } finally { 608 fullyUnlock(); 609 } 610 } 611 612 /** 613 * Returns an array containing all of the elements in this queue, in 614 * proper sequence; the runtime type of the returned array is that of 615 * the specified array. If the queue fits in the specified array, it 616 * is returned therein. Otherwise, a new array is allocated with the 617 * runtime type of the specified array and the size of this queue. 618 * 619 * <p>If this queue fits in the specified array with room to spare 620 * (i.e., the array has more elements than this queue), the element in 621 * the array immediately following the end of the queue is set to 622 * {@code null}. 623 * 624 * <p>Like the {@link #toArray()} method, this method acts as bridge between 625 * array-based and collection-based APIs. Further, this method allows 626 * precise control over the runtime type of the output array, and may, 627 * under certain circumstances, be used to save allocation costs. 628 * 629 * <p>Suppose {@code x} is a queue known to contain only strings. 630 * The following code can be used to dump the queue into a newly 631 * allocated array of {@code String}: 632 * 633 * <pre> 634 * String[] y = x.toArray(new String[0]);</pre> 635 * 636 * Note that {@code toArray(new Object[0])} is identical in function to 637 * {@code toArray()}. 638 * 639 * @param a the array into which the elements of the queue are to 640 * be stored, if it is big enough; otherwise, a new array of the 641 * same runtime type is allocated for this purpose 642 * @return an array containing all of the elements in this queue 643 * @throws ArrayStoreException if the runtime type of the specified array 644 * is not a supertype of the runtime type of every element in 645 * this queue 646 * @throws NullPointerException if the specified array is null 647 */ 648 @SuppressWarnings("unchecked") 649 public <T> T[] toArray(T[] a) { 650 fullyLock(); 651 try { 652 int size = count.get(); 653 if (a.length < size) 654 a = (T[])java.lang.reflect.Array.newInstance 655 (a.getClass().getComponentType(), size); 656 657 int k = 0; 658 for (Node<E> p = head.next; p != null; p = p.next) 659 a[k++] = (T)p.item; 660 if (a.length > k) 661 a[k] = null; 662 return a; 663 } finally { 664 fullyUnlock(); 665 } 666 } 667 668 public String toString() { 669 fullyLock(); 670 try { 671 Node<E> p = head.next; 672 if (p == null) 673 return "[]"; 674 675 StringBuilder sb = new StringBuilder(); 676 sb.append('['); 677 for (;;) { 678 E e = p.item; 679 sb.append(e == this ? "(this Collection)" : e); 680 p = p.next; 681 if (p == null) 682 return sb.append(']').toString(); 683 sb.append(',').append(' '); 684 } 685 } finally { 686 fullyUnlock(); 687 } 688 } 689 690 /** 691 * Atomically removes all of the elements from this queue. 692 * The queue will be empty after this call returns. 693 */ 694 public void clear() { 695 fullyLock(); 696 try { 697 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 698 h.next = h; 699 p.item = null; 700 } 701 head = last; 702 // assert head.item == null && head.next == null; 703 if (count.getAndSet(0) == capacity) 704 notFull.signal(); 705 } finally { 706 fullyUnlock(); 707 } 708 } 709 710 /** 711 * @throws UnsupportedOperationException {@inheritDoc} 712 * @throws ClassCastException {@inheritDoc} 713 * @throws NullPointerException {@inheritDoc} 714 * @throws IllegalArgumentException {@inheritDoc} 715 */ 716 public int drainTo(Collection<? super E> c) { 717 return drainTo(c, Integer.MAX_VALUE); 718 } 719 720 /** 721 * @throws UnsupportedOperationException {@inheritDoc} 722 * @throws ClassCastException {@inheritDoc} 723 * @throws NullPointerException {@inheritDoc} 724 * @throws IllegalArgumentException {@inheritDoc} 725 */ 726 public int drainTo(Collection<? super E> c, int maxElements) { 727 if (c == null) 728 throw new NullPointerException(); 729 if (c == this) 730 throw new IllegalArgumentException(); 731 boolean signalNotFull = false; 732 final ReentrantLock takeLock = this.takeLock; 733 takeLock.lock(); 734 try { 735 int n = Math.min(maxElements, count.get()); 736 // count.get provides visibility to first n Nodes 737 Node<E> h = head; 738 int i = 0; 739 try { 740 while (i < n) { 741 Node<E> p = h.next; 742 c.add(p.item); 743 p.item = null; 744 h.next = h; 745 h = p; 746 ++i; 747 } 748 return n; 749 } finally { 750 // Restore invariants even if c.add() threw 751 if (i > 0) { 752 // assert h.item == null; 753 head = h; 754 signalNotFull = (count.getAndAdd(-i) == capacity); 755 } 756 } 757 } finally { 758 takeLock.unlock(); 759 if (signalNotFull) 760 signalNotFull(); 761 } 762 } 763 764 /** 765 * Returns an iterator over the elements in this queue in proper sequence. 766 * The elements will be returned in order from first (head) to last (tail). 767 * 768 * <p>The returned iterator is a "weakly consistent" iterator that 769 * will never throw {@link java.util.ConcurrentModificationException 770 * ConcurrentModificationException}, and guarantees to traverse 771 * elements as they existed upon construction of the iterator, and 772 * may (but is not guaranteed to) reflect any modifications 773 * subsequent to construction. 774 * 775 * @return an iterator over the elements in this queue in proper sequence 776 */ 777 public Iterator<E> iterator() { 778 return new Itr(); 779 } 780 781 private class Itr implements Iterator<E> { 782 /* 783 * Basic weakly-consistent iterator. At all times hold the next 784 * item to hand out so that if hasNext() reports true, we will 785 * still have it to return even if lost race with a take etc. 786 */ 787 private Node<E> current; 788 private Node<E> lastRet; 789 private E currentElement; 790 791 Itr() { 792 fullyLock(); 793 try { 794 current = head.next; 795 if (current != null) 796 currentElement = current.item; 797 } finally { 798 fullyUnlock(); 799 } 800 } 801 802 public boolean hasNext() { 803 return current != null; 804 } 805 806 /** 807 * Returns the next live successor of p, or null if no such. 808 * 809 * Unlike other traversal methods, iterators need to handle both: 810 * - dequeued nodes (p.next == p) 811 * - (possibly multiple) interior removed nodes (p.item == null) 812 */ 813 private Node<E> nextNode(Node<E> p) { 814 for (;;) { 815 Node<E> s = p.next; 816 if (s == p) 817 return head.next; 818 if (s == null || s.item != null) 819 return s; 820 p = s; 821 } 822 } 823 824 public E next() { 825 fullyLock(); 826 try { 827 if (current == null) 828 throw new NoSuchElementException(); 829 E x = currentElement; 830 lastRet = current; 831 current = nextNode(current); 832 currentElement = (current == null) ? null : current.item; 833 return x; 834 } finally { 835 fullyUnlock(); 836 } 837 } 838 839 public void remove() { 840 if (lastRet == null) 841 throw new IllegalStateException(); 842 fullyLock(); 843 try { 844 Node<E> node = lastRet; 845 lastRet = null; 846 for (Node<E> trail = head, p = trail.next; 847 p != null; 848 trail = p, p = p.next) { 849 if (p == node) { 850 unlink(p, trail); 851 break; 852 } 853 } 854 } finally { 855 fullyUnlock(); 856 } 857 } 858 } 859 860 /** 861 * Save the state to a stream (that is, serialize it). 862 * 863 * @serialData The capacity is emitted (int), followed by all of 864 * its elements (each an {@code Object}) in the proper order, 865 * followed by a null 866 * @param s the stream 867 */ 868 private void writeObject(java.io.ObjectOutputStream s) 869 throws java.io.IOException { 870 871 fullyLock(); 872 try { 873 // Write out any hidden stuff, plus capacity 874 s.defaultWriteObject(); 875 876 // Write out all elements in the proper order. 877 for (Node<E> p = head.next; p != null; p = p.next) 878 s.writeObject(p.item); 879 880 // Use trailing null as sentinel 881 s.writeObject(null); 882 } finally { 883 fullyUnlock(); 884 } 885 } 886 887 /** 888 * Reconstitute this queue instance from a stream (that is, 889 * deserialize it). 890 * 891 * @param s the stream 892 */ 893 private void readObject(java.io.ObjectInputStream s) 894 throws java.io.IOException, ClassNotFoundException { 895 // Read in capacity, and any hidden stuff 896 s.defaultReadObject(); 897 898 count.set(0); 899 last = head = new Node<E>(null); 900 901 // Read in all elements and place in queue 902 for (;;) { 903 @SuppressWarnings("unchecked") 904 E item = (E)s.readObject(); 905 if (item == null) 906 break; 907 add(item); 908 } 909 } 910 }
下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。
1. 创建
下面以LinkedBlockingQueue(int capacity)来进行说明。
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); }
说明:
(01) capacity是“链式阻塞队列”的容量。
(02) head和last是“链式阻塞队列”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:
// 容量 private final int capacity; // 当前数量 private final AtomicInteger count = new AtomicInteger(0); private transient Node<E> head; // 链表的表头 private transient Node<E> last; // 链表的表尾 // 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmpty private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
链表的节点定义如下:
static class Node<E> {E item; // 数据Node<E> next; // 下一个节点的指针 Node(E x) { item = x; } }
2. 添加
下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。
public boolean offer(E e) {if (e == null) throw new NullPointerException();// 如果“队列已满”,则返回false,表示插入失败。final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;// 新建“节点e”Node<E> node = new Node(e);final ReentrantLock putLock = this.putLock;// 获取“插入锁putLock” putLock.lock();try {// 再次对“队列是不是满”的进行判断。// 若“队列未满”,则插入节点。if (count.get() < capacity) {// 插入节点 enqueue(node);// 将“当前节点数量”+1,并返回“原始的数量”c = count.getAndIncrement();// 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。if (c + 1 < capacity)notFull.signal();}} finally {// 释放“插入锁putLock” putLock.unlock();}// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程if (c == 0)signalNotEmpty();return c >= 0; }
说明:offer()的作用很简单,就是将元素E添加到队列的末尾。
enqueue()的源码如下:
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node; }
enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!
signalNotEmpty()的源码如下:
private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();} }
signalNotEmpty()的作用是唤醒notEmpty上的等待线程。
3. 取出
下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常 takeLock.lockInterruptibly();try {// 若“队列为空”,则一直等待。while (count.get() == 0) {notEmpty.await();}// 取出元素x = dequeue();// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {// 释放“取出锁” takeLock.unlock();}// 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。if (c == capacity)signalNotFull();return x; }
说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。
dequeue()的源码如下:
private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x; }
dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。
signalNotFull()的源码如下:
private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();} }
signalNotFull()的作用就是唤醒notFull上的等待线程。
4. 遍历
下面对LinkedBlockingQueue的遍历方法进行说明。
public Iterator<E> iterator() {return new Itr(); }
iterator()实际上是返回一个Iter对象。
Itr类的定义如下:
private class Itr implements Iterator<E> {// 当前节点private Node<E> current;// 上一次返回的节点private Node<E> lastRet;// 当前节点对应的值private E currentElement;Itr() {// 同时获取“插入锁putLock” 和 “取出锁takeLock” fullyLock();try {// 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点current = head.next;if (current != null)currentElement = current.item;} finally {// 释放“插入锁putLock” 和 “取出锁takeLock” fullyUnlock();}}// 返回“下一个节点是否为null”public boolean hasNext() {return current != null;}private Node<E> nextNode(Node<E> p) {for (;;) {Node<E> s = p.next;if (s == p)return head.next;if (s == null || s.item != null)return s;p = s;}}// 返回下一个节点public E next() {fullyLock();try {if (current == null)throw new NoSuchElementException();E x = currentElement;lastRet = current;current = nextNode(current);currentElement = (current == null) ? null : current.item;return x;} finally {fullyUnlock();}}// 删除下一个节点public void remove() {if (lastRet == null)throw new IllegalStateException();fullyLock();try {Node<E> node = lastRet;lastRet = null;for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (p == node) {unlink(p, trail);break;}}} finally {fullyUnlock();}} }
LinkedBlockingQueue示例
1 import java.util.*; 2 import java.util.concurrent.*; 3 4 /* 5 * LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。 6 * 7 * 下面是“多个线程同时操作并且遍历queue”的示例 8 * (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。 9 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 10 * 11 * @author skywang 12 */ 13 public class LinkedBlockingQueueDemo1 { 14 15 // TODO: queue是LinkedList对象时,程序会出错。 16 //private static Queue<String> queue = new LinkedList<String>(); 17 private static Queue<String> queue = new LinkedBlockingQueue<String>(); 18 public static void main(String[] args) { 19 20 // 同时启动两个线程对queue进行操作! 21 new MyThread("ta").start(); 22 new MyThread("tb").start(); 23 } 24 25 private static void printAll() { 26 String value; 27 Iterator iter = queue.iterator(); 28 while(iter.hasNext()) { 29 value = (String)iter.next(); 30 System.out.print(value+", "); 31 } 32 System.out.println(); 33 } 34 35 private static class MyThread extends Thread { 36 MyThread(String name) { 37 super(name); 38 } 39 @Override 40 public void run() { 41 int i = 0; 42 while (i++ < 6) { 43 // “线程名” + "-" + "序号" 44 String val = Thread.currentThread().getName()+i; 45 queue.add(val); 46 // 通过“Iterator”遍历queue。 47 printAll(); 48 } 49 } 50 } 51 }
(某一次)运行结果:
tb1, ta1, tb1, ta1, ta2, tb1, ta1, ta2, ta3, tb1, ta1, ta2, ta3, ta4, tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, ta4, tb1, ta5, ta1, ta6, ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, ta5, ta6, tb2, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,
结果说明:
示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。
Java多线程系列--【JUC集合08】- LinkedBlockingQueue相关推荐
- Java多线程系列---“JUC锁”01之 框架
本章,我们介绍锁的架构:后面的章节将会对它们逐个进行分析介绍.目录如下: 01. Java多线程系列--"JUC锁"01之 框架 02. Java多线程系列--"JUC锁 ...
- Java多线程系列--“JUC原子类”01之 框架
2019独角兽企业重金招聘Python工程师标准>>> Java多线程系列--"JUC原子类"01之 框架 根据修改的数据类型,可以将JUC包中的原子操作类可以分 ...
- Java多线程系列--“JUC原子类”03之 AtomicLongArray原子类
概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray这3个数组类型的原子类的原理和用法相似.本章以AtomicLongArray对数 ...
- Java多线程系列--“JUC线程池”06之 Callable和Future
转载自 Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...
- Java多线程系列--“JUC锁”05之 非公平锁
转载自:http://www.cnblogs.com/skywang12345/p/3496651.html点击打开链接 概要 前面两章分析了"公平锁的获取和释放机制",这一章开始 ...
- Java多线程系列--“JUC锁”03之 公平锁(一)
概要 本章对"公平锁"的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括: 基本概念 ReentrantLock数据结构 参考代码 获取公平锁(基于JDK1.7.0 ...
- java futher多线程_Java多线程系列--“JUC集合”05之 ConcurrentSkipListMap
概要 本章对Java.util.concurrent包中的ConcurrentSkipListMap类进行详细的介绍.内容包括: ConcurrentSkipListMap介绍 ConcurrentS ...
- Java多线程系列 JUC线程池05 线程池原理解析(四)
转载 http://www.cnblogs.com/skywang12345/p/3544116.html https://blog.csdn.net/programmer_at/article/d ...
- Java多线程系列---“JUC原子类”02之 框架
转自:http://www.cnblogs.com/skywang12345/p/3514589.html 根据修改的数据类型,可以将JUC包中的原子操作类可以分为4类. 1. 基本类型: Atomi ...
- Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例
CyclicBarrier简介 CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).因为该 barrier 在释放等 ...
最新文章
- 如何规划令人流连忘返的网站?
- Linux安装gcc时碰到的有关问题解决(解决gcc依赖有关问题)
- 【Windows 逆向】OD 调试器工具 ( 显示模块窗口 | 显示记录窗口 | 显示内存窗口 | 显示线程 | 显示句柄 | 显示 CPU | 多窗口界面 )
- 计算机组成原理延迟时间ty,计算机组成原理之数值的机器运算培训教程方案.ppt...
- centos压缩和解压缩
- Docker 深入理解概念
- Linux声卡驱动框图
- java装箱与拆箱_【转】java 自动装箱与拆箱
- 如何得到DataTable的列名
- OMNeT 例程 Tictoc18 学习笔记
- 元转万元单位换算_元要以万元为单位要怎么换算
- MT6573默认锁屏界面修改
- web前端学习34(表格标签 小说排行榜案例)
- dataworks/odps上传资源,注册函数,下载资源
- 格兰富Grundfos CM3-2 A-R-A-E-AVBE F-A-A-N
- 使用 Hexo 快速免费搭建个人网站
- 大数据推动磁带浴火重生 归档市场已超越云
- 英语 —语法— 句子成分
- 为何如今在主板上找不到北桥了?简述主板芯片组发展史
- ${} 与 #{} 区别
热门文章
- 计算机主板 上电顺序,BIOS很熟悉,电脑开机BIOS开机自检顺序你知道吗?
- python怎么让矩阵内所有元素自己平方_python numpy库中矩阵用法指南
- android日程源代码,android日程表实现---仿滴答清单
- “智能营销新图景”梅花网大展华院数据主题演讲
- iOS开发中Certificates,IdentifiersProfiles各种证书配置文件总结
- 关于iphone备份在c盘之后无法恢复备份的问题
- 使用CSS3实现一个正方体相册
- win7计算机的用户名和密码,win7文件共享访问需要输入用户名和密码如何解决
- Git修改用户名和密码
- 女人如何获取安全感?