您的位置:首页 > 产品设计 > UI/UE

Java多线程系列--【JUC集合09】- LinkedBlockingDeque

2018-01-27 12:58 701 查看
参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html


概要

本章介绍JUC包中的LinkedBlockingDeque。内容包括:

LinkedBlockingDeque介绍

LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque函数列表

LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503480.html


LinkedBlockingDeque介绍

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。

此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。


LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque的数据结构,如下图所示:





说明

1. LinkedBlockingDeque继承于AbstractQueue,它本质上是一个支持FIFO和FILO的双向的队列。

2. LinkedBlockingDeque实现了BlockingDeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。

3. LinkedBlockingDeque是通过双向链表实现的。

3.1 first是双向链表的表头。

3.2 last是双向链表的表尾。

3.3 count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数。

3.4 capacity是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的。

3.5 lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。

3.6 notEmpty和notFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。

-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。


关于ReentrantLock 和 Condition等更多的内容,可以参考:

(01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock

(02) Java多线程系列--“JUC锁”03之 公平锁(一)

(03) Java多线程系列--“JUC锁”04之 公平锁(二)

(04) Java多线程系列--“JUC锁”05之 非公平锁

(05) Java多线程系列--“JUC锁”06之 Condition条件


LinkedBlockingDeque函数列表

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
boolean add(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 从此双端队列移除所有元素。
void clear()
// 如果此双端队列包含指定的元素,则返回 true。
boolean contains(Object o)
// 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 获取但不移除此双端队列表示的队列的头部。
E element()
// 获取,但不移除此双端队列的第一个元素。
E getFirst()
// 获取,但不移除此双端队列的最后一个元素。
E getLast()
// 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offer(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerFirst(E e)
// 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerLast(E e)
// 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E peek()
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E peekFirst()
// 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E peekLast()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E poll()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
E poll(long timeout, TimeUnit unit)
// 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E pollFirst()
// 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E pollLast()
// 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 从此双端队列所表示的堆栈中弹出一个元素。
E pop()
// 将元素推入此双端队列表示的栈。
void push(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。
void put(E e)
// 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
void putFirst(E e)
// 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
void putLast(E e)
// 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。
int remainingCapacity()
// 获取并移除此双端队列表示的队列的头部。
E remove()
// 从此双端队列移除第一次出现的指定元素。
boolean remove(Object o)
// 获取并移除此双端队列第一个元素。
E removeFirst()
// 从此双端队列移除第一次出现的指定元素。
boolean removeFirstOccurrence(Object o)
// 获取并移除此双端队列的最后一个元素。
E removeLast()
// 从此双端队列移除最后一次出现的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此双端队列中的元素数。
int size()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。
E take()
// 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。
E takeFirst()
// 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。
E takeLast()
// 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()



LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源码如下:



1 /*
2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3  *
4  *
5  *
6  *
7  *
8  *
9  *
10  *
11  *
12  *
13  *
14  *
15  *
16  *
17  *
18  *
19  *
20  *
21  *
22  *
23  */
24
25 /*
26  *
27  *
28  *
29  *
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.concurrent.locks.Condition;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 /**
46  * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
47  * linked nodes.
48  *
49  * <p> The optional capacity bound constructor argument serves as a
50  * way to prevent excessive expansion. The capacity, if unspecified,
51  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
52  * dynamically created upon each insertion unless this would bring the
53  * deque above capacity.
54  *
55  * <p>Most operations run in constant time (ignoring time spent
56  * blocking).  Exceptions include {@link #remove(Object) remove},
57  * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
58  * #removeLastOccurrence removeLastOccurrence}, {@link #contains
59  * contains}, {@link #iterator iterator.remove()}, and the bulk
60  * operations, all of which run in linear time.
61  *
62  * <p>This class and its iterator implement all of the
63  * <em>optional</em> methods of the {@link Collection} and {@link
64  * Iterator} interfaces.
65  *
66  * <p>This class is a member of the
67  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
68  * Java Collections Framework</a>.
69  *
70  * @since 1.6
71  * @author  Doug Lea
72  * @param <E> the type of elements held in this collection
73  */
74 public class LinkedBlockingDeque<E>
75     extends AbstractQueue<E>
76     implements BlockingDeque<E>,  java.io.Serializable {
77
78     /*
79      * Implemented as a simple doubly-linked list protected by a
80      * single lock and using conditions to manage blocking.
81      *
82      * To implement weakly consistent iterators, it appears we need to
83      * keep all Nodes GC-reachable from a predecessor dequeued Node.
84      * That would cause two problems:
85      * - allow a rogue Iterator to cause unbounded memory retention
86      * - cause cross-generational linking of old Nodes to new Nodes if
87      *   a Node was tenured while live, which generational GCs have a
88      *   hard time dealing with, causing repeated major collections.
89      * However, only non-deleted Nodes need to be reachable from
90      * dequeued Nodes, and reachability does not necessarily have to
91      * be of the kind understood by the GC.  We use the trick of
92      * linking a Node that has just been dequeued to itself.  Such a
93      * self-link implicitly means to jump to "first" (for next links)
94      * or "last" (for prev links).
95      */
96
97     /*
98      * We have "diamond" multiple interface/abstract class inheritance
99      * here, and that introduces ambiguities. Often we want the
100      * BlockingDeque javadoc combined with the AbstractQueue
101      * implementation, so a lot of method specs are duplicated here.
102      */
103
104     private static final long serialVersionUID = -387911632671998426L;
105
106     /** Doubly-linked list node class */
107     static final class Node<E> {
108         /**
109          * The item, or null if this node has been removed.
110          */
111         E item;
112
113         /**
114          * One of:
115          * - the real predecessor Node
116          * - this Node, meaning the predecessor is tail
117          * - null, meaning there is no predecessor
118          */
119         Node<E> prev;
120
121         /**
122          * One of:
123          * - the real successor Node
124          * - this Node, meaning the successor is head
125          * - null, meaning there is no successor
126          */
127         Node<E> next;
128
129         Node(E x) {
130             item = x;
131         }
132     }
133
134     /**
135      * Pointer to first node.
136      * Invariant: (first == null && last == null) ||
137      *            (first.prev == null && first.item != null)
138      */
139     transient Node<E> first;
140
141     /**
142      * Pointer to last node.
143      * Invariant: (first == null && last == null) ||
144      *            (last.next == null && last.item != null)
145      */
146     transient Node<E> last;
147
148     /** Number of items in the deque */
149     private transient int count;
150
151     /** Maximum number of items in the deque */
152     private final int capacity;
153
154     /** Main lock guarding all access */
155     final ReentrantLock lock = new ReentrantLock();
156
157     /** Condition for waiting takes */
158     private final Condition notEmpty = lock.newCondition();
159
160     /** Condition for waiting puts */
161     private final Condition notFull = lock.newCondition();
162
163     /**
164      * Creates a {@code LinkedBlockingDeque} with a capacity of
165      * {@link Integer#MAX_VALUE}.
166      */
167     public LinkedBlockingDeque() {
168         this(Integer.MAX_VALUE);
169     }
170
171     /**
172      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
173      *
174      * @param capacity the capacity of this deque
175      * @throws IllegalArgumentException if {@code capacity} is less than 1
176      */
177     public LinkedBlockingDeque(int capacity) {
178         if (capacity <= 0) throw new IllegalArgumentException();
179         this.capacity = capacity;
180     }
181
182     /**
183      * Creates a {@code LinkedBlockingDeque} with a capacity of
184      * {@link Integer#MAX_VALUE}, initially containing the elements of
185      * the given collection, added in traversal order of the
186      * collection's iterator.
187      *
188      * @param c the collection of elements to initially contain
189      * @throws NullPointerException if the specified collection or any
190      *         of its elements are null
191      */
192     public LinkedBlockingDeque(Collection<? extends E> c) {
193         this(Integer.MAX_VALUE);
194         final ReentrantLock lock = this.lock;
195         lock.lock(); // Never contended, but necessary for visibility
196         try {
197             for (E e : c) {
198                 if (e == null)
199                     throw new NullPointerException();
200                 if (!linkLast(new Node<E>(e)))
201                     throw new IllegalStateException("Deque full");
202             }
203         } finally {
204             lock.unlock();
205         }
206     }
207
208
209     // Basic linking and unlinking operations, called only while holding lock
210
211     /**
212      * Links node as first element, or returns false if full.
213      */
214     private boolean linkFirst(Node<E> node) {
215         // assert lock.isHeldByCurrentThread();
216         if (count >= capacity)
217             return false;
218         Node<E> f = first;
219         node.next = f;
220         first = node;
221         if (last == null)
222             last = node;
223         else
224             f.prev = node;
225         ++count;
226         notEmpty.signal();
227         return true;
228     }
229
230     /**
231      * Links node as last element, or returns false if full.
232      */
233     private boolean linkLast(Node<E> node) {
234         // assert lock.isHeldByCurrentThread();
235         if (count >= capacity)
236             return false;
237         Node<E> l = last;
238         node.prev = l;
239         last = node;
240         if (first == null)
241             first = node;
242         else
243             l.next = node;
244         ++count;
245         notEmpty.signal();
246         return true;
247     }
248
249     /**
250      * Removes and returns first element, or null if empty.
251      */
252     private E unlinkFirst() {
253         // assert lock.isHeldByCurrentThread();
254         Node<E> f = first;
255         if (f == null)
256             return null;
257         Node<E> n = f.next;
258         E item = f.item;
259         f.item = null;
260         f.next = f; // help GC
261         first = n;
262         if (n == null)
263             last = null;
264         else
265             n.prev = null;
266         --count;
267         notFull.signal();
268         return item;
269     }
270
271     /**
272      * Removes and returns last element, or null if empty.
273      */
274     private E unlinkLast() {
275         // assert lock.isHeldByCurrentThread();
276         Node<E> l = last;
277         if (l == null)
278             return null;
279         Node<E> p = l.prev;
280         E item = l.item;
281         l.item = null;
282         l.prev = l; // help GC
283         last = p;
284         if (p == null)
285             first = null;
286         else
287             p.next = null;
288         --count;
289         notFull.signal();
290         return item;
291     }
292
293     /**
294      * Unlinks x.
295      */
296     void unlink(Node<E> x) {
297         // assert lock.isHeldByCurrentThread();
298         Node<E> p = x.prev;
299         Node<E> n = x.next;
300         if (p == null) {
301             unlinkFirst();
302         } else if (n == null) {
303             unlinkLast();
304         } else {
305             p.next = n;
306             n.prev = p;
307             x.item = null;
308             // Don't mess with x's links.  They may still be in use by
309             // an iterator.
310             --count;
311             notFull.signal();
312         }
313     }
314
315     // BlockingDeque methods
316
317     /**
318      * @throws IllegalStateException {@inheritDoc}
319      * @throws NullPointerException  {@inheritDoc}
320      */
321     public void addFirst(E e) {
322         if (!offerFirst(e))
323             throw new IllegalStateException("Deque full");
324     }
325
326     /**
327      * @throws IllegalStateException {@inheritDoc}
328      * @throws NullPointerException  {@inheritDoc}
329      */
330     public void addLast(E e) {
331         if (!offerLast(e))
332             throw new IllegalStateException("Deque full");
333     }
334
335     /**
336      * @throws NullPointerException {@inheritDoc}
337      */
338     public boolean offerFirst(E e) {
339         if (e == null) throw new NullPointerException();
340         Node<E> node = new Node<E>(e);
341         final ReentrantLock lock = this.lock;
342         lock.lock();
343         try {
344             return linkFirst(node);
345         } finally {
346             lock.unlock();
347         }
348     }
349
350     /**
351      * @throws NullPointerException {@inheritDoc}
352      */
353     public boolean offerLast(E e) {
354         if (e == null) throw new NullPointerException();
355         Node<E> node = new Node<E>(e);
356         final ReentrantLock lock = this.lock;
357         lock.lock();
358         try {
359             return linkLast(node);
360         } finally {
361             lock.unlock();
362         }
363     }
364
365     /**
366      * @throws NullPointerException {@inheritDoc}
367      * @throws InterruptedException {@inheritDoc}
368      */
369     public void putFirst(E e) throws InterruptedException {
370         if (e == null) throw new NullPointerException();
371         Node<E> node = new Node<E>(e);
372         final ReentrantLock lock = this.lock;
373         lock.lock();
374         try {
375             while (!linkFirst(node))
376                 notFull.await();
377         } finally {
378             lock.unlock();
379         }
380     }
381
382     /**
383      * @throws NullPointerException {@inheritDoc}
384      * @throws InterruptedException {@inheritDoc}
385      */
386     public void putLast(E e) throws InterruptedException {
387         if (e == null) throw new NullPointerException();
388         Node<E> node = new Node<E>(e);
389         final ReentrantLock lock = this.lock;
390         lock.lock();
391         try {
392             while (!linkLast(node))
393                 notFull.await();
394         } finally {
395             lock.unlock();
396         }
397     }
398
399     /**
400      * @throws NullPointerException {@inheritDoc}
401      * @throws InterruptedException {@inheritDoc}
402      */
403     public boolean offerFirst(E e, long timeout, TimeUnit unit)
404         throws InterruptedException {
405         if (e == null) throw new NullPointerException();
406         Node<E> node = new Node<E>(e);
407         long nanos = unit.toNanos(timeout);
408         final ReentrantLock lock = this.lock;
409         lock.lockInterruptibly();
410         try {
411             while (!linkFirst(node)) {
412                 if (nanos <= 0)
413                     return false;
414                 nanos = notFull.awaitNanos(nanos);
415             }
416             return true;
417         } finally {
418             lock.unlock();
419         }
420     }
421
422     /**
423      * @throws NullPointerException {@inheritDoc}
424      * @throws InterruptedException {@inheritDoc}
425      */
426     public boolean offerLast(E e, long timeout, TimeUnit unit)
427         throws InterruptedException {
428         if (e == null) throw new NullPointerException();
429         Node<E> node = new Node<E>(e);
430         long nanos = unit.toNanos(timeout);
431         final ReentrantLock lock = this.lock;
432         lock.lockInterruptibly();
433         try {
434             while (!linkLast(node)) {
435                 if (nanos <= 0)
436                     return false;
437                 nanos = notFull.awaitNanos(nanos);
438             }
439             return true;
440         } finally {
441             lock.unlock();
442         }
443     }
444
445     /**
446      * @throws NoSuchElementException {@inheritDoc}
447      */
448     public E removeFirst() {
449         E x = pollFirst();
450         if (x == null) throw new NoSuchElementException();
451         return x;
452     }
453
454     /**
455      * @throws NoSuchElementException {@inheritDoc}
456      */
457     public E removeLast() {
458         E x = pollLast();
459         if (x == null) throw new NoSuchElementException();
460         return x;
461     }
462
463     public E pollFirst() {
464         final ReentrantLock lock = this.lock;
465         lock.lock();
466         try {
467             return unlinkFirst();
468         } finally {
469             lock.unlock();
470         }
471     }
472
473     public E pollLast() {
474         final ReentrantLock lock = this.lock;
475         lock.lock();
476         try {
477             return unlinkLast();
478         } finally {
479             lock.unlock();
480         }
481     }
482
483     public E takeFirst() throws InterruptedException {
484         final ReentrantLock lock = this.lock;
485         lock.lock();
486         try {
487             E x;
488             while ( (x = unlinkFirst()) == null)
489                 notEmpty.await();
490             return x;
491         } finally {
492             lock.unlock();
493         }
494     }
495
496     public E takeLast() throws InterruptedException {
497         final ReentrantLock lock = this.lock;
498         lock.lock();
499         try {
500             E x;
501             while ( (x = unlinkLast()) == null)
502                 notEmpty.await();
503             return x;
504         } finally {
505             lock.unlock();
506         }
507     }
508
509     public E pollFirst(long timeout, TimeUnit unit)
510         throws InterruptedException {
511         long nanos = unit.toNanos(timeout);
512         final ReentrantLock lock = this.lock;
513         lock.lockInterruptibly();
514         try {
515             E x;
516             while ( (x = unlinkFirst()) == null) {
517                 if (nanos <= 0)
518                     return null;
519                 nanos = notEmpty.awaitNanos(nanos);
520             }
521             return x;
522         } finally {
523             lock.unlock();
524         }
525     }
526
527     public E pollLast(long timeout, TimeUnit unit)
528         throws InterruptedException {
529         long nanos = unit.toNanos(timeout);
530         final ReentrantLock lock = this.lock;
531         lock.lockInterruptibly();
532         try {
533             E x;
534             while ( (x = unlinkLast()) == null) {
535                 if (nanos <= 0)
536                     return null;
537                 nanos = notEmpty.awaitNanos(nanos);
538             }
539             return x;
540         } finally {
541             lock.unlock();
542         }
543     }
544
545     /**
546      * @throws NoSuchElementException {@inheritDoc}
547      */
548     public E getFirst() {
549         E x = peekFirst();
550         if (x == null) throw new NoSuchElementException();
551         return x;
552     }
553
554     /**
555      * @throws NoSuchElementException {@inheritDoc}
556      */
557     public E getLast() {
558         E x = peekLast();
559         if (x == null) throw new NoSuchElementException();
560         return x;
561     }
562
563     public E peekFirst() {
564         final ReentrantLock lock = this.lock;
565         lock.lock();
566         try {
567             return (first == null) ? null : first.item;
568         } finally {
569             lock.unlock();
570         }
571     }
572
573     public E peekLast() {
574         final ReentrantLock lock = this.lock;
575         lock.lock();
576         try {
577             return (last == null) ? null : last.item;
578         } finally {
579             lock.unlock();
580         }
581     }
582
583     public boolean removeFirstOccurrence(Object o) {
584         if (o == null) return false;
585         final ReentrantLock lock = this.lock;
586         lock.lock();
587         try {
588             for (Node<E> p = first; p != null; p = p.next) {
589                 if (o.equals(p.item)) {
590                     unlink(p);
591                     return true;
592                 }
593             }
594             return false;
595         } finally {
596             lock.unlock();
597         }
598     }
599
600     public boolean removeLastOccurrence(Object o) {
601         if (o == null) return false;
602         final ReentrantLock lock = this.lock;
603         lock.lock();
604         try {
605             for (Node<E> p = last; p != null; p = p.prev) {
606                 if (o.equals(p.item)) {
607                     unlink(p);
608                     return true;
609                 }
610             }
611             return false;
612         } finally {
613             lock.unlock();
614         }
615     }
616
617     // BlockingQueue methods
618
619     /**
620      * Inserts the specified element at the end of this deque unless it would
621      * violate capacity restrictions.  When using a capacity-restricted deque,
622      * it is generally preferable to use method {@link #offer(Object) offer}.
623      *
624      * <p>This method is equivalent to {@link #addLast}.
625      *
626      * @throws IllegalStateException if the element cannot be added at this
627      *         time due to capacity restrictions
628      * @throws NullPointerException if the specified element is null
629      */
630     public boolean add(E e) {
631         addLast(e);
632         return true;
633     }
634
635     /**
636      * @throws NullPointerException if the specified element is null
637      */
638     public boolean offer(E e) {
639         return offerLast(e);
640     }
641
642     /**
643      * @throws NullPointerException {@inheritDoc}
644      * @throws InterruptedException {@inheritDoc}
645      */
646     public void put(E e) throws InterruptedException {
647         putLast(e);
648     }
649
650     /**
651      * @throws NullPointerException {@inheritDoc}
652      * @throws InterruptedException {@inheritDoc}
653      */
654     public boolean offer(E e, long timeout, TimeUnit unit)
655         throws InterruptedException {
656         return offerLast(e, timeout, unit);
657     }
658
659     /**
660      * Retrieves and removes the head of the queue represented by this deque.
661      * This method differs from {@link #poll poll} only in that it throws an
662      * exception if this deque is empty.
663      *
664      * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
665      *
666      * @return the head of the queue represented by this deque
667      * @throws NoSuchElementException if this deque is empty
668      */
669     public E remove() {
670         return removeFirst();
671     }
672
673     public E poll() {
674         return pollFirst();
675     }
676
677     public E take() throws InterruptedException {
678         return takeFirst();
679     }
680
681     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
682         return pollFirst(timeout, unit);
683     }
684
685     /**
686      * Retrieves, but does not remove, the head of the queue represented by
687      * this deque.  This method differs from {@link #peek peek} only in that
688      * it throws an exception if this deque is empty.
689      *
690      * <p>This method is equivalent to {@link #getFirst() getFirst}.
691      *
692      * @return the head of the queue represented by this deque
693      * @throws NoSuchElementException if this deque is empty
694      */
695     public E element() {
696         return getFirst();
697     }
698
699     public E peek() {
700         return peekFirst();
701     }
702
703     /**
704      * Returns the number of additional elements that this deque can ideally
705      * (in the absence of memory or resource constraints) accept without
706      * blocking. This is always equal to the initial capacity of this deque
707      * less the current {@code size} of this deque.
708      *
709      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
710      * an element will succeed by inspecting {@code remainingCapacity}
711      * because it may be the case that another thread is about to
712      * insert or remove an element.
713      */
714     public int remainingCapacity() {
715         final ReentrantLock lock = this.lock;
716         lock.lock();
717         try {
718             return capacity - count;
719         } finally {
720             lock.unlock();
721         }
722     }
723
724     /**
725      * @throws UnsupportedOperationException {@inheritDoc}
726      * @throws ClassCastException            {@inheritDoc}
727      * @throws NullPointerException          {@inheritDoc}
728      * @throws IllegalArgumentException      {@inheritDoc}
729      */
730     public int drainTo(Collection<? super E> c) {
731         return drainTo(c, Integer.MAX_VALUE);
732     }
733
734     /**
735      * @throws UnsupportedOperationException {@inheritDoc}
736      * @throws ClassCastException            {@inheritDoc}
737      * @throws NullPointerException          {@inheritDoc}
738      * @throws IllegalArgumentException      {@inheritDoc}
739      */
740     public int drainTo(Collection<? super E> c, int maxElements) {
741         if (c == null)
742             throw new NullPointerException();
743         if (c == this)
744             throw new IllegalArgumentException();
745         final ReentrantLock lock = this.lock;
746         lock.lock();
747         try {
748             int n = Math.min(maxElements, count);
749             for (int i = 0; i < n; i++) {
750                 c.add(first.item);   // In this order, in case add() throws.
751                 unlinkFirst();
752             }
753             return n;
754         } finally {
755             lock.unlock();
756         }
757     }
758
759     // Stack methods
760
761     /**
762      * @throws IllegalStateException {@inheritDoc}
763      * @throws NullPointerException  {@inheritDoc}
764      */
765     public void push(E e) {
766         addFirst(e);
767     }
768
769     /**
770      * @throws NoSuchElementException {@inheritDoc}
771      */
772     public E pop() {
773         return removeFirst();
774     }
775
776     // Collection methods
777
778     /**
779      * Removes the first occurrence of the specified element from this deque.
780      * If the deque does not contain the element, it is unchanged.
781      * More formally, removes the first element {@code e} such that
782      * {@code o.equals(e)} (if such an element exists).
783      * Returns {@code true} if this deque contained the specified element
784      * (or equivalently, if this deque changed as a result of the call).
785      *
786      * <p>This method is equivalent to
787      * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
788      *
789      * @param o element to be removed from this deque, if present
790      * @return {@code true} if this deque changed as a result of the call
791      */
792     public boolean remove(Object o) {
793         return removeFirstOccurrence(o);
794     }
795
796     /**
797      * Returns the number of elements in this deque.
798      *
799      * @return the number of elements in this deque
800      */
801     public int size() {
802         final ReentrantLock lock = this.lock;
803         lock.lock();
804         try {
805             return count;
806         } finally {
807             lock.unlock();
808         }
809     }
810
811     /**
812      * Returns {@code true} if this deque contains the specified element.
813      * More formally, returns {@code true} if and only if this deque contains
814      * at least one element {@code e} such that {@code o.equals(e)}.
815      *
816      * @param o object to be checked for containment in this deque
817      * @return {@code true} if this deque contains the specified element
818      */
819     public boolean contains(Object o) {
820         if (o == null) return false;
821         final ReentrantLock lock = this.lock;
822         lock.lock();
823         try {
824             for (Node<E> p = first; p != null; p = p.next)
825                 if (o.equals(p.item))
826                     return true;
827             return false;
828         } finally {
829             lock.unlock();
830         }
831     }
832
833     /*
834      * TODO: Add support for more efficient bulk operations.
835      *
836      * We don't want to acquire the lock for every iteration, but we
837      * also want other threads a chance to interact with the
838      * collection, especially when count is close to capacity.
839      */
840
841 //     /**
842 //      * Adds all of the elements in the specified collection to this
843 //      * queue.  Attempts to addAll of a queue to itself result in
844 //      * {@code IllegalArgumentException}. Further, the behavior of
845 //      * this operation is undefined if the specified collection is
846 //      * modified while the operation is in progress.
847 //      *
848 //      * @param c collection containing elements to be added to this queue
849 //      * @return {@code true} if this queue changed as a result of the call
850 //      * @throws ClassCastException            {@inheritDoc}
851 //      * @throws NullPointerException          {@inheritDoc}
852 //      * @throws IllegalArgumentException      {@inheritDoc}
853 //      * @throws IllegalStateException         {@inheritDoc}
854 //      * @see #add(Object)
855 //      */
856 //     public boolean addAll(Collection<? extends E> c) {
857 //         if (c == null)
858 //             throw new NullPointerException();
859 //         if (c == this)
860 //             throw new IllegalArgumentException();
861 //         final ReentrantLock lock = this.lock;
862 //         lock.lock();
863 //         try {
864 //             boolean modified = false;
865 //             for (E e : c)
866 //                 if (linkLast(e))
867 //                     modified = true;
868 //             return modified;
869 //         } finally {
870 //             lock.unlock();
871 //         }
872 //     }
873
874     /**
875      * Returns an array containing all of the elements in this deque, in
876      * proper sequence (from first to last element).
877      *
878      * <p>The returned array will be "safe" in that no references to it are
879      * maintained by this deque.  (In other words, this method must allocate
880      * a new array).  The caller is thus free to modify the returned array.
881      *
882      * <p>This method acts as bridge between array-based and collection-based
883      * APIs.
884      *
885      * @return an array containing all of the elements in this deque
886      */
887     @SuppressWarnings("unchecked")
888     public Object[] toArray() {
889         final ReentrantLock lock = this.lock;
890         lock.lock();
891         try {
892             Object[] a = new Object[count];
893             int k = 0;
894             for (Node<E> p = first; p != null; p = p.next)
895                 a[k++] = p.item;
896             return a;
897         } finally {
898             lock.unlock();
899         }
900     }
901
902     /**
903      * Returns an array containing all of the elements in this deque, in
904      * proper sequence; the runtime type of the returned array is that of
905      * the specified array.  If the deque fits in the specified array, it
906      * is returned therein.  Otherwise, a new array is allocated with the
907      * runtime type of the specified array and the size of this deque.
908      *
909      * <p>If this deque fits in the specified array with room to spare
910      * (i.e., the array has more elements than this deque), the element in
911      * the array immediately following the end of the deque is set to
912      * {@code null}.
913      *
914      * <p>Like the {@link #toArray()} method, this method acts as bridge between
915      * array-based and collection-based APIs.  Further, this method allows
916      * precise control over the runtime type of the output array, and may,
917      * under certain circumstances, be used to save allocation costs.
918      *
919      * <p>Suppose {@code x} is a deque known to contain only strings.
920      * The following code can be used to dump the deque into a newly
921      * allocated array of {@code String}:
922      *
923      * <pre>
924      *     String[] y = x.toArray(new String[0]);</pre>
925      *
926      * Note that {@code toArray(new Object[0])} is identical in function to
927      * {@code toArray()}.
928      *
929      * @param a the array into which the elements of the deque are to
930      *          be stored, if it is big enough; otherwise, a new array of the
931      *          same runtime type is allocated for this purpose
932      * @return an array containing all of the elements in this deque
933      * @throws ArrayStoreException if the runtime type of the specified array
934      *         is not a supertype of the runtime type of every element in
935      *         this deque
936      * @throws NullPointerException if the specified array is null
937      */
938     @SuppressWarnings("unchecked")
939     public <T> T[] toArray(T[] a) {
940         final ReentrantLock lock = this.lock;
941         lock.lock();
942         try {
943             if (a.length < count)
944                 a = (T[])java.lang.reflect.Array.newInstance
945                     (a.getClass().getComponentType(), count);
946
947             int k = 0;
948             for (Node<E> p = first; p != null; p = p.next)
949                 a[k++] = (T)p.item;
950             if (a.length > k)
951                 a[k] = null;
952             return a;
953         } finally {
954             lock.unlock();
955         }
956     }
957
958     public String toString() {
959         final ReentrantLock lock = this.lock;
960         lock.lock();
961         try {
962             Node<E> p = first;
963             if (p == null)
964                 return "[]";
965
966             StringBuilder sb = new StringBuilder();
967             sb.append('[');
968             for (;;) {
969                 E e = p.item;
970                 sb.append(e == this ? "(this Collection)" : e);
971                 p = p.next;
972                 if (p == null)
973                     return sb.append(']').toString();
974                 sb.append(',').append(' ');
975             }
976         } finally {
977             lock.unlock();
978         }
979     }
980
981     /**
982      * Atomically removes all of the elements from this deque.
983      * The deque will be empty after this call returns.
984      */
985     public void clear() {
986         final ReentrantLock lock = this.lock;
987         lock.lock();
988         try {
989             for (Node<E> f = first; f != null; ) {
990                 f.item = null;
991                 Node<E> n = f.next;
992                 f.prev = null;
993                 f.next = null;
994                 f = n;
995             }
996             first = last = null;
997             count = 0;
998             notFull.signalAll();
999         } finally {
1000             lock.unlock();
1001         }
1002     }
1003
1004     /**
1005      * Returns an iterator over the elements in this deque in proper sequence.
1006      * The elements will be returned in order from first (head) to last (tail).
1007      *
1008      * <p>The returned iterator is a "weakly consistent" iterator that
1009      * will never throw {@link java.util.ConcurrentModificationException
1010      * ConcurrentModificationException}, and guarantees to traverse
1011      * elements as they existed upon construction of the iterator, and
1012      * may (but is not guaranteed to) reflect any modifications
1013      * subsequent to construction.
1014      *
1015      * @return an iterator over the elements in this deque in proper sequence
1016      */
1017     public Iterator<E> iterator() {
1018         return new Itr();
1019     }
1020
1021     /**
1022      * Returns an iterator over the elements in this deque in reverse
1023      * sequential order.  The elements will be returned in order from
1024      * last (tail) to first (head).
1025      *
1026      * <p>The returned iterator is a "weakly consistent" iterator that
1027      * will never throw {@link java.util.ConcurrentModificationException
1028      * ConcurrentModificationException}, and guarantees to traverse
1029      * elements as they existed upon construction of the iterator, and
1030      * may (but is not guaranteed to) reflect any modifications
1031      * subsequent to construction.
1032      *
1033      * @return an iterator over the elements in this deque in reverse order
1034      */
1035     public Iterator<E> descendingIterator() {
1036         return new DescendingItr();
1037     }
1038
1039     /**
1040      * Base class for Iterators for LinkedBlockingDeque
1041      */
1042     private abstract class AbstractItr implements Iterator<E> {
1043         /**
1044          * The next node to return in next()
1045          */
1046          Node<E> next;
1047
1048         /**
1049          * nextItem holds on to item fields because once we claim that
1050          * an element exists in hasNext(), we must return item read
1051          * under lock (in advance()) even if it was in the process of
1052          * being removed when hasNext() was called.
1053          */
1054         E nextItem;
1055
1056         /**
1057          * Node returned by most recent call to next. Needed by remove.
1058          * Reset to null if this element is deleted by a call to remove.
1059          */
1060         private Node<E> lastRet;
1061
1062         abstract Node<E> firstNode();
1063         abstract Node<E> nextNode(Node<E> n);
1064
1065         AbstractItr() {
1066             // set to initial position
1067             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1068             lock.lock();
1069             try {
1070                 next = firstNode();
1071                 nextItem = (next == null) ? null : next.item;
1072             } finally {
1073                 lock.unlock();
1074             }
1075         }
1076
1077         /**
1078          * Returns the successor node of the given non-null, but
1079          * possibly previously deleted, node.
1080          */
1081         private Node<E> succ(Node<E> n) {
1082             // Chains of deleted nodes ending in null or self-links
1083             // are possible if multiple interior nodes are removed.
1084             for (;;) {
1085                 Node<E> s = nextNode(n);
1086                 if (s == null)
1087                     return null;
1088                 else if (s.item != null)
1089                     return s;
1090                 else if (s == n)
1091                     return firstNode();
1092                 else
1093                     n = s;
1094             }
1095         }
1096
1097         /**
1098          * Advances next.
1099          */
1100         void advance() {
1101             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1102             lock.lock();
1103             try {
1104                 // assert next != null;
1105                 next = succ(next);
1106                 nextItem = (next == null) ? null : next.item;
1107             } finally {
1108                 lock.unlock();
1109             }
1110         }
1111
1112         public boolean hasNext() {
1113             return next != null;
1114         }
1115
1116         public E next() {
1117             if (next == null)
1118                 throw new NoSuchElementException();
1119             lastRet = next;
1120             E x = nextItem;
1121             advance();
1122             return x;
1123         }
1124
1125         public void remove() {
1126             Node<E> n = lastRet;
1127             if (n == null)
1128                 throw new IllegalStateException();
1129             lastRet = null;
1130             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1131             lock.lock();
1132             try {
1133                 if (n.item != null)
1134                     unlink(n);
1135             } finally {
1136                 lock.unlock();
1137             }
1138         }
1139     }
1140
1141     /** Forward iterator */
1142     private class Itr extends AbstractItr {
1143         Node<E> firstNode() { return first; }
1144         Node<E> nextNode(Node<E> n) { return n.next; }
1145     }
1146
1147     /** Descending iterator */
1148     private class DescendingItr extends AbstractItr {
1149         Node<E> firstNode() { return last; }
1150         Node<E> nextNode(Node<E> n) { return n.prev; }
1151     }
1152
1153     /**
1154      * Save the state of this deque to a stream (that is, serialize it).
1155      *
1156      * @serialData The capacity (int), followed by elements (each an
1157      * {@code Object}) in the proper order, followed by a null
1158      * @param s the stream
1159      */
1160     private void writeObject(java.io.ObjectOutputStream s)
1161         throws java.io.IOException {
1162         final ReentrantLock lock = this.lock;
1163         lock.lock();
1164         try {
1165             // Write out capacity and any hidden stuff
1166             s.defaultWriteObject();
1167             // Write out all elements in the proper order.
1168             for (Node<E> p = first; p != null; p = p.next)
1169                 s.writeObject(p.item);
1170             // Use trailing null as sentinel
1171             s.writeObject(null);
1172         } finally {
1173             lock.unlock();
1174         }
1175     }
1176
1177     /**
1178      * Reconstitute this deque from a stream (that is,
1179      * deserialize it).
1180      * @param s the stream
1181      */
1182     private void readObject(java.io.ObjectInputStream s)
1183         throws java.io.IOException, ClassNotFoundException {
1184         s.defaultReadObject();
1185         count = 0;
1186         first = null;
1187         last = null;
1188         // Read in all elements and place in queue
1189         for (;;) {
1190             @SuppressWarnings("unchecked")
1191             E item = (E)s.readObject();
1192             if (item == null)
1193                 break;
1194             add(item);
1195         }
1196     }
1197
1198 }


下面从ArrayBlockingQueue的创建,添加,取出,遍历这几个方面对LinkedBlockingDeque进行分析

1. 创建

下面以LinkedBlockingDeque(int capacity)来进行说明。

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}


说明:capacity是“链式阻塞队列”的容量。

LinkedBlockingDeque中相关的数据结果定义如下:

// “双向队列”的表头
transient Node<E> first;
// “双向队列”的表尾
transient Node<E> last;
// 节点数量
private transient int count;
// 容量
private final int capacity;
// 互斥锁 , 互斥锁对应的“非空条件notEmpty”, 互斥锁对应的“未满条件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();


说明:lock是互斥锁,用于控制多线程对LinkedBlockingDeque中元素的互斥访问;而notEmpty和notFull是与lock绑定的条件,它们用于实现对多线程更精确的控制。

双向链表的节点Node的定义如下:

static final class Node<E> {
E item;       // 数据
Node<E> prev; // 前一节点
Node<E> next; // 后一节点

Node(E x) { item = x; }
}


2. 添加

下面以offer(E e)为例,对LinkedBlockingDeque的添加方法进行说明。

public boolean offer(E e) {
return offerLast(e);
}


offer()实际上是调用offerLast()将元素添加到队列的末尾。

offerLast()的源码如下:

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
// 新建节点
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 将“新节点”添加到双向链表的末尾
return linkLast(node);
} finally {
// 释放锁
lock.unlock();
}
}


说明:offerLast()的作用,是新建节点并将该节点插入到双向链表的末尾。它在插入节点前,会获取锁;操作完毕,再释放锁。

linkLast()的源码如下:

private boolean linkLast(Node<E> node) {
// 如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败。
if (count >= capacity)
return false;
// 将“node添加到链表末尾”,并设置node为新的尾节点
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
// 将“节点数量”+1
++count;
// 插入节点之后,唤醒notEmpty上的等待线程。
notEmpty.signal();
return true;
}


说明:linkLast()的作用,是将节点插入到双向队列的末尾;插入节点之后,唤醒notEmpty上的等待线程。

3. 删除

下面以take()为例,对LinkedBlockingDeque的取出方法进行说明。

public E take() throws InterruptedException {
return takeFirst();
}


take()实际上是调用takeFirst()队列的第一个元素。

takeFirst()的源码如下:

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
E x;
// 若“队列为空”,则一直等待。否则,通过unlinkFirst()删除第一个节点。
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
// 释放锁
lock.unlock();
}
}


说明:takeFirst()的作用,是删除双向链表的第一个节点,并返回节点对应的值。它在插入节点前,会获取锁;操作完毕,再释放锁。

unlinkFirst()的源码如下:

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
// 删除并更新“第一个节点”
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
// 将“节点数量”-1
--count;
// 删除节点之后,唤醒notFull上的等待线程。
notFull.signal();
return item;
}


说明:unlinkFirst()的作用,是将双向队列的第一个节点删除;删除节点之后,唤醒notFull上的等待线程。

4. 遍历

下面对LinkedBlockingDeque的遍历方法进行说明。

public Iterator<E> iterator() {
return new Itr();
}


iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

private class Itr extends AbstractItr {
// “双向队列”的表头
Node<E> firstNode() { return first; }
// 获取“节点n的下一个节点”
Node<E> nextNode(Node<E> n) { return n.next; }
}


Itr继承于AbstractItr,而AbstractItr的定义如下:

private abstract class AbstractItr implements Iterator<E> {
// next是下一次调用next()会返回的节点。
Node<E> next;
// nextItem是next()返回节点对应的数据。
E nextItem;
// 上一次next()返回的节点。
private Node<E> lastRet;
// 返回第一个节点
abstract Node<E> firstNode();
// 返回下一个节点
abstract Node<E> nextNode(Node<E> n);

AbstractItr() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
// 获取“LinkedBlockingDeque的互斥锁”
lock.lock();
try {
// 获取“双向队列”的表头
next = firstNode();
// 获取表头对应的数据
nextItem = (next == null) ? null : next.item;
} finally {
// 释放“LinkedBlockingDeque的互斥锁”
lock.unlock();
}
}

// 获取n的后继节点
private Node<E> succ(Node<E> n) {
// Chains of deleted nodes ending in null or self-links
// are possible if multiple interior nodes are removed.
for (;;) {
Node<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n)
return firstNode();
else
n = s;
}
}

// 更新next和nextItem。
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}

// 返回“下一个节点是否为null”
public boolean hasNext() {
return next != null;
}

// 返回下一个节点
public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet = next;
E x = nextItem;
advance();
return x;
}

// 删除下一个节点
public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)
unlink(n);
} finally {
lock.unlock();
}
}
}



LinkedBlockingDeque示例

1 import java.util.*;
2 import java.util.concurrent.*;
3
4 /*
5  *   LinkedBlockingDeque是“线程安全”的队列,而LinkedList是非线程安全的。
6  *
7  *   下面是“多个线程同时操作并且遍历queue”的示例
8  *   (01) 当queue是LinkedBlockingDeque对象时,程序能正常运行。
9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingDequeDemo1 {
14
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingDeque<String>();
18     public static void main(String[] args) {
19
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }


(某一次)运行结果

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2,
ta2,
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3,
ta3, ta1,
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4,
tb4, ta1, ta4, tb1, tb5,
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5,
ta4, ta1, tb5, tb1, ta5, tb2, tb6,
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6,
tb5, ta5, tb6, ta6,


结果说明:示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingDeque进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingDeque中;接着,遍历并输出LinkedBlockingDeque中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。

当queue是LinkedBlockingDeque对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。


概要

本章介绍JUC包中的LinkedBlockingDeque。内容包括:

LinkedBlockingDeque介绍

LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque函数列表

LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503480.html


LinkedBlockingDeque介绍

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。

此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。


LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque的数据结构,如下图所示:





说明

1. LinkedBlockingDeque继承于AbstractQueue,它本质上是一个支持FIFO和FILO的双向的队列。

2. LinkedBlockingDeque实现了BlockingDeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。

3. LinkedBlockingDeque是通过双向链表实现的。

3.1 first是双向链表的表头。

3.2 last是双向链表的表尾。

3.3 count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数。

3.4 capacity是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的。

3.5 lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。

3.6 notEmpty和notFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。

-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。


关于ReentrantLock 和 Condition等更多的内容,可以参考:

(01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock

(02) Java多线程系列--“JUC锁”03之 公平锁(一)

(03) Java多线程系列--“JUC锁”04之 公平锁(二)

(04) Java多线程系列--“JUC锁”05之 非公平锁

(05) Java多线程系列--“JUC锁”06之 Condition条件


LinkedBlockingDeque函数列表

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
boolean add(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 从此双端队列移除所有元素。
void clear()
// 如果此双端队列包含指定的元素,则返回 true。
boolean contains(Object o)
// 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 获取但不移除此双端队列表示的队列的头部。
E element()
// 获取,但不移除此双端队列的第一个元素。
E getFirst()
// 获取,但不移除此双端队列的最后一个元素。
E getLast()
// 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offer(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerFirst(E e)
// 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerLast(E e)
// 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E peek()
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E peekFirst()
// 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E peekLast()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E poll()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
E poll(long timeout, TimeUnit unit)
// 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E pollFirst()
// 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E pollLast()
// 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 从此双端队列所表示的堆栈中弹出一个元素。
E pop()
// 将元素推入此双端队列表示的栈。
void push(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。
void put(E e)
// 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
void putFirst(E e)
// 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
void putLast(E e)
// 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。
int remainingCapacity()
// 获取并移除此双端队列表示的队列的头部。
E remove()
// 从此双端队列移除第一次出现的指定元素。
boolean remove(Object o)
// 获取并移除此双端队列第一个元素。
E removeFirst()
// 从此双端队列移除第一次出现的指定元素。
boolean removeFirstOccurrence(Object o)
// 获取并移除此双端队列的最后一个元素。
E removeLast()
// 从此双端队列移除最后一次出现的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此双端队列中的元素数。
int size()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。
E take()
// 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。
E takeFirst()
// 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。
E takeLast()
// 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()



LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源码如下:



1 /*
2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3  *
4  *
5  *
6  *
7  *
8  *
9  *
10  *
11  *
12  *
13  *
14  *
15  *
16  *
17  *
18  *
19  *
20  *
21  *
22  *
23  */
24
25 /*
26  *
27  *
28  *
29  *
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/ 34  */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.concurrent.locks.Condition;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 /**
46  * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
47  * linked nodes.
48  *
49  * <p> The optional capacity bound constructor argument serves as a
50  * way to prevent excessive expansion. The capacity, if unspecified,
51  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
52  * dynamically created upon each insertion unless this would bring the
53  * deque above capacity.
54  *
55  * <p>Most operations run in constant time (ignoring time spent
56  * blocking).  Exceptions include {@link #remove(Object) remove},
57  * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
58  * #removeLastOccurrence removeLastOccurrence}, {@link #contains
59  * contains}, {@link #iterator iterator.remove()}, and the bulk
60  * operations, all of which run in linear time.
61  *
62  * <p>This class and its iterator implement all of the
63  * <em>optional</em> methods of the {@link Collection} and {@link
64  * Iterator} interfaces.
65  *
66  * <p>This class is a member of the
67  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
68  * Java Collections Framework</a>.
69  *
70  * @since 1.6
71  * @author  Doug Lea
72  * @param <E> the type of elements held in this collection
73  */
74 public class LinkedBlockingDeque<E>
75     extends AbstractQueue<E>
76     implements BlockingDeque<E>,  java.io.Serializable {
77
78     /*
79      * Implemented as a simple doubly-linked list protected by a
80      * single lock and using conditions to manage blocking.
81      *
82      * To implement weakly consistent iterators, it appears we need to
83      * keep all Nodes GC-reachable from a predecessor dequeued Node.
84      * That would cause two problems:
85      * - allow a rogue Iterator to cause unbounded memory retention
86      * - cause cross-generational linking of old Nodes to new Nodes if
87      *   a Node was tenured while live, which generational GCs have a
88      *   hard time dealing with, causing repeated major collections.
89      * However, only non-deleted Nodes need to be reachable from
90      * dequeued Nodes, and reachability does not necessarily have to
91      * be of the kind understood by the GC.  We use the trick of
92      * linking a Node that has just been dequeued to itself.  Such a
93      * self-link implicitly means to jump to "first" (for next links)
94      * or "last" (for prev links).
95      */
96
97     /*
98      * We have "diamond" multiple interface/abstract class inheritance
99      * here, and that introduces ambiguities. Often we want the
100      * BlockingDeque javadoc combined with the AbstractQueue
101      * implementation, so a lot of method specs are duplicated here.
102      */
103
104     private static final long serialVersionUID = -387911632671998426L;
105
106     /** Doubly-linked list node class */
107     static final class Node<E> {
108         /**
109          * The item, or null if this node has been removed.
110          */
111         E item;
112
113         /**
114          * One of:
115          * - the real predecessor Node
116          * - this Node, meaning the predecessor is tail
117          * - null, meaning there is no predecessor
118          */
119         Node<E> prev;
120
121         /**
122          * One of:
123          * - the real successor Node
124          * - this Node, meaning the successor is head
125          * - null, meaning there is no successor
126          */
127         Node<E> next;
128
129         Node(E x) {
130             item = x;
131         }
132     }
133
134     /**
135      * Pointer to first node.
136      * Invariant: (first == null && last == null) ||
137      *            (first.prev == null && first.item != null)
138      */
139     transient Node<E> first;
140
141     /**
142      * Pointer to last node.
143      * Invariant: (first == null && last == null) ||
144      *            (last.next == null && last.item != null)
145      */
146     transient Node<E> last;
147
148     /** Number of items in the deque */
149     private transient int count;
150
151     /** Maximum number of items in the deque */
152     private final int capacity;
153
154     /** Main lock guarding all access */
155     final ReentrantLock lock = new ReentrantLock();
156
157     /** Condition for waiting takes */
158     private final Condition notEmpty = lock.newCondition();
159
160     /** Condition for waiting puts */
161     private final Condition notFull = lock.newCondition();
162
163     /**
164      * Creates a {@code LinkedBlockingDeque} with a capacity of
165      * {@link Integer#MAX_VALUE}.
166      */
167     public LinkedBlockingDeque() {
168         this(Integer.MAX_VALUE);
169     }
170
171     /**
172      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
173      *
174      * @param capacity the capacity of this deque
175      * @throws IllegalArgumentException if {@code capacity} is less than 1
176      */
177     public LinkedBlockingDeque(int capacity) {
178         if (capacity <= 0) throw new IllegalArgumentException();
179         this.capacity = capacity;
180     }
181
182     /**
183      * Creates a {@code LinkedBlockingDeque} with a capacity of
184      * {@link Integer#MAX_VALUE}, initially containing the elements of
185      * the given collection, added in traversal order of the
186      * collection's iterator.
187      *
188      * @param c the collection of elements to initially contain
189      * @throws NullPointerException if the specified collection or any
190      *         of its elements are null
191      */
192     public LinkedBlockingDeque(Collection<? extends E> c) {
193         this(Integer.MAX_VALUE);
194         final ReentrantLock lock = this.lock;
195         lock.lock(); // Never contended, but necessary for visibility
196         try {
197             for (E e : c) {
198                 if (e == null)
199                     throw new NullPointerException();
200                 if (!linkLast(new Node<E>(e)))
201                     throw new IllegalStateException("Deque full");
202             }
203         } finally {
204             lock.unlock();
205         }
206     }
207
208
209     // Basic linking and unlinking operations, called only while holding lock
210
211     /**
212      * Links node as first element, or returns false if full.
213      */
214     private boolean linkFirst(Node<E> node) {
215         // assert lock.isHeldByCurrentThread();
216         if (count >= capacity)
217             return false;
218         Node<E> f = first;
219         node.next = f;
220         first = node;
221         if (last == null)
222             last = node;
223         else
224             f.prev = node;
225         ++count;
226         notEmpty.signal();
227         return true;
228     }
229
230     /**
231      * Links node as last element, or returns false if full.
232      */
233     private boolean linkLast(Node<E> node) {
234         // assert lock.isHeldByCurrentThread();
235         if (count >= capacity)
236             return false;
237         Node<E> l = last;
238         node.prev = l;
239         last = node;
240         if (first == null)
241             first = node;
242         else
243             l.next = node;
244         ++count;
245         notEmpty.signal();
246         return true;
247     }
248
249     /**
250      * Removes and returns first element, or null if empty.
251      */
252     private E unlinkFirst() {
253         // assert lock.isHeldByCurrentThread();
254         Node<E> f = first;
255         if (f == null)
256             return null;
257         Node<E> n = f.next;
258         E item = f.item;
259         f.item = null;
260         f.next = f; // help GC
261         first = n;
262         if (n == null)
263             last = null;
264         else
265             n.prev = null;
266         --count;
267         notFull.signal();
268         return item;
269     }
270
271     /**
272      * Removes and returns last element, or null if empty.
273      */
274     private E unlinkLast() {
275         // assert lock.isHeldByCurrentThread();
276         Node<E> l = last;
277         if (l == null)
278             return null;
279         Node<E> p = l.prev;
280         E item = l.item;
281         l.item = null;
282         l.prev = l; // help GC
283         last = p;
284         if (p == null)
285             first = null;
286         else
287             p.next = null;
288         --count;
289         notFull.signal();
290         return item;
291     }
292
293     /**
294      * Unlinks x.
295      */
296     void unlink(Node<E> x) {
297         // assert lock.isHeldByCurrentThread();
298         Node<E> p = x.prev;
299         Node<E> n = x.next;
300         if (p == null) {
301             unlinkFirst();
302         } else if (n == null) {
303             unlinkLast();
304         } else {
305             p.next = n;
306             n.prev = p;
307             x.item = null;
308             // Don't mess with x's links.  They may still be in use by
309             // an iterator.
310             --count;
311             notFull.signal();
312         }
313     }
314
315     // BlockingDeque methods
316
317     /**
318      * @throws IllegalStateException {@inheritDoc}
319      * @throws NullPointerException  {@inheritDoc}
320      */
321     public void addFirst(E e) {
322         if (!offerFirst(e))
323             throw new IllegalStateException("Deque full");
324     }
325
326     /**
327      * @throws IllegalStateException {@inheritDoc}
328      * @throws NullPointerException  {@inheritDoc}
329      */
330     public void addLast(E e) {
331         if (!offerLast(e))
332             throw new IllegalStateException("Deque full");
333     }
334
335     /**
336      * @throws NullPointerException {@inheritDoc}
337      */
338     public boolean offerFirst(E e) {
339         if (e == null) throw new NullPointerException();
340         Node<E> node = new Node<E>(e);
341         final ReentrantLock lock = this.lock;
342         lock.lock();
343         try {
344             return linkFirst(node);
345         } finally {
346             lock.unlock();
347         }
348     }
349
350     /**
351      * @throws NullPointerException {@inheritDoc}
352      */
353     public boolean offerLast(E e) {
354         if (e == null) throw new NullPointerException();
355         Node<E> node = new Node<E>(e);
356         final ReentrantLock lock = this.lock;
357         lock.lock();
358         try {
359             return linkLast(node);
360         } finally {
361             lock.unlock();
362         }
363     }
364
365     /**
366      * @throws NullPointerException {@inheritDoc}
367      * @throws InterruptedException {@inheritDoc}
368      */
369     public void putFirst(E e) throws InterruptedException {
370         if (e == null) throw new NullPointerException();
371         Node<E> node = new Node<E>(e);
372         final ReentrantLock lock = this.lock;
373         lock.lock();
374         try {
375             while (!linkFirst(node))
376                 notFull.await();
377         } finally {
378             lock.unlock();
379         }
380     }
381
382     /**
383      * @throws NullPointerException {@inheritDoc}
384      * @throws InterruptedException {@inheritDoc}
385      */
386     public void putLast(E e) throws InterruptedException {
387         if (e == null) throw new NullPointerException();
388         Node<E> node = new Node<E>(e);
389         final ReentrantLock lock = this.lock;
390         lock.lock();
391         try {
392             while (!linkLast(node))
393                 notFull.await();
394         } finally {
395             lock.unlock();
396         }
397     }
398
399     /**
400      * @throws NullPointerException {@inheritDoc}
401      * @throws InterruptedException {@inheritDoc}
402      */
403     public boolean offerFirst(E e, long timeout, TimeUnit unit)
404         throws InterruptedException {
405         if (e == null) throw new NullPointerException();
406         Node<E> node = new Node<E>(e);
407         long nanos = unit.toNanos(timeout);
408         final ReentrantLock lock = this.lock;
409         lock.lockInterruptibly();
410         try {
411             while (!linkFirst(node)) {
412                 if (nanos <= 0)
413                     return false;
414                 nanos = notFull.awaitNanos(nanos);
415             }
416             return true;
417         } finally {
418             lock.unlock();
419         }
420     }
421
422     /**
423      * @throws NullPointerException {@inheritDoc}
424      * @throws InterruptedException {@inheritDoc}
425      */
426     public boolean offerLast(E e, long timeout, TimeUnit unit)
427         throws InterruptedException {
428         if (e == null) throw new NullPointerException();
429         Node<E> node = new Node<E>(e);
430         long nanos = unit.toNanos(timeout);
431         final ReentrantLock lock = this.lock;
432         lock.lockInterruptibly();
433         try {
434             while (!linkLast(node)) {
435                 if (nanos <= 0)
436                     return false;
437                 nanos = notFull.awaitNanos(nanos);
438             }
439             return true;
440         } finally {
441             lock.unlock();
442         }
443     }
444
445     /**
446      * @throws NoSuchElementException {@inheritDoc}
447      */
448     public E removeFirst() {
449         E x = pollFirst();
450         if (x == null) throw new NoSuchElementException();
451         return x;
452     }
453
454     /**
455      * @throws NoSuchElementException {@inheritDoc}
456      */
457     public E removeLast() {
458         E x = pollLast();
459         if (x == null) throw new NoSuchElementException();
460         return x;
461     }
462
463     public E pollFirst() {
464         final ReentrantLock lock = this.lock;
465         lock.lock();
466         try {
467             return unlinkFirst();
468         } finally {
469             lock.unlock();
470         }
471     }
472
473     public E pollLast() {
474         final ReentrantLock lock = this.lock;
475         lock.lock();
476         try {
477             return unlinkLast();
478         } finally {
479             lock.unlock();
480         }
481     }
482
483     public E takeFirst() throws InterruptedException {
484         final ReentrantLock lock = this.lock;
485         lock.lock();
486         try {
487             E x;
488             while ( (x = unlinkFirst()) == null)
489                 notEmpty.await();
490             return x;
491         } finally {
492             lock.unlock();
493         }
494     }
495
496     public E takeLast() throws InterruptedException {
497         final ReentrantLock lock = this.lock;
498         lock.lock();
499         try {
500             E x;
501             while ( (x = unlinkLast()) == null)
502                 notEmpty.await();
503             return x;
504         } finally {
505             lock.unlock();
506         }
507     }
508
509     public E pollFirst(long timeout, TimeUnit unit)
510         throws InterruptedException {
511         long nanos = unit.toNanos(timeout);
512         final ReentrantLock lock = this.lock;
513         lock.lockInterruptibly();
514         try {
515             E x;
516             while ( (x = unlinkFirst()) == null) {
517                 if (nanos <= 0)
518                     return null;
519                 nanos = notEmpty.awaitNanos(nanos);
520             }
521             return x;
522         } finally {
523             lock.unlock();
524         }
525     }
526
527     public E pollLast(long timeout, TimeUnit unit)
528         throws InterruptedException {
529         long nanos = unit.toNanos(timeout);
530         final ReentrantLock lock = this.lock;
531         lock.lockInterruptibly();
532         try {
533             E x;
534             while ( (x = unlinkLast()) == null) {
535                 if (nanos <= 0)
536                     return null;
537                 nanos = notEmpty.awaitNanos(nanos);
538             }
539             return x;
540         } finally {
541             lock.unlock();
542         }
543     }
544
545     /**
546      * @throws NoSuchElementException {@inheritDoc}
547      */
548     public E getFirst() {
549         E x = peekFirst();
550         if (x == null) throw new NoSuchElementException();
551         return x;
552     }
553
554     /**
555      * @throws NoSuchElementException {@inheritDoc}
556      */
557     public E getLast() {
558         E x = peekLast();
559         if (x == null) throw new NoSuchElementException();
560         return x;
561     }
562
563     public E peekFirst() {
564         final ReentrantLock lock = this.lock;
565         lock.lock();
566         try {
567             return (first == null) ? null : first.item;
568         } finally {
569             lock.unlock();
570         }
571     }
572
573     public E peekLast() {
574         final ReentrantLock lock = this.lock;
575         lock.lock();
576         try {
577             return (last == null) ? null : last.item;
578         } finally {
579             lock.unlock();
580         }
581     }
582
583     public boolean removeFirstOccurrence(Object o) {
584         if (o == null) return false;
585         final ReentrantLock lock = this.lock;
586         lock.lock();
587         try {
588             for (Node<E> p = first; p != null; p = p.next) {
589                 if (o.equals(p.item)) {
590                     unlink(p);
591                     return true;
592                 }
593             }
594             return false;
595         } finally {
596             lock.unlock();
597         }
598     }
599
600     public boolean removeLastOccurrence(Object o) {
601         if (o == null) return false;
602         final ReentrantLock lock = this.lock;
603         lock.lock();
604         try {
605             for (Node<E> p = last; p != null; p = p.prev) {
606                 if (o.equals(p.item)) {
607                     unlink(p);
608                     return true;
609                 }
610             }
611             return false;
612         } finally {
613             lock.unlock();
614         }
615     }
616
617     // BlockingQueue methods
618
619     /**
620      * Inserts the specified element at the end of this deque unless it would
621      * violate capacity restrictions.  When using a capacity-restricted deque,
622      * it is generally preferable to use method {@link #offer(Object) offer}.
623      *
624      * <p>This method is equivalent to {@link #addLast}.
625      *
626      * @throws IllegalStateException if the element cannot be added at this
627      *         time due to capacity restrictions
628      * @throws NullPointerException if the specified element is null
629      */
630     public boolean add(E e) {
631         addLast(e);
632         return true;
633     }
634
635     /**
636      * @throws NullPointerException if the specified element is null
637      */
638     public boolean offer(E e) {
639         return offerLast(e);
640     }
641
642     /**
643      * @throws NullPointerException {@inheritDoc}
644      * @throws InterruptedException {@inheritDoc}
645      */
646     public void put(E e) throws InterruptedException {
647         putLast(e);
648     }
649
650     /**
651      * @throws NullPointerException {@inheritDoc}
652      * @throws InterruptedException {@inheritDoc}
653      */
654     public boolean offer(E e, long timeout, TimeUnit unit)
655         throws InterruptedException {
656         return offerLast(e, timeout, unit);
657     }
658
659     /**
660      * Retrieves and removes the head of the queue represented by this deque.
661      * This method differs from {@link #poll poll} only in that it throws an
662      * exception if this deque is empty.
663      *
664      * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
665      *
666      * @return the head of the queue represented by this deque
667      * @throws NoSuchElementException if this deque is empty
668      */
669     public E remove() {
670         return removeFirst();
671     }
672
673     public E poll() {
674         return pollFirst();
675     }
676
677     public E take() throws InterruptedException {
678         return takeFirst();
679     }
680
681     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
682         return pollFirst(timeout, unit);
683     }
684
685     /**
686      * Retrieves, but does not remove, the head of the queue represented by
687      * this deque.  This method differs from {@link #peek peek} only in that
688      * it throws an exception if this deque is empty.
689      *
690      * <p>This method is equivalent to {@link #getFirst() getFirst}.
691      *
692      * @return the head of the queue represented by this deque
693      * @throws NoSuchElementException if this deque is empty
694      */
695     public E element() {
696         return getFirst();
697     }
698
699     public E peek() {
700         return peekFirst();
701     }
702
703     /**
704      * Returns the number of additional elements that this deque can ideally
705      * (in the absence of memory or resource constraints) accept without
706      * blocking. This is always equal to the initial capacity of this deque
707      * less the current {@code size} of this deque.
708      *
709      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
710      * an element will succeed by inspecting {@code remainingCapacity}
711      * because it may be the case that another thread is about to
712      * insert or remove an element.
713      */
714     public int remainingCapacity() {
715         final ReentrantLock lock = this.lock;
716         lock.lock();
717         try {
718             return capacity - count;
719         } finally {
720             lock.unlock();
721         }
722     }
723
724     /**
725      * @throws UnsupportedOperationException {@inheritDoc}
726      * @throws ClassCastException            {@inheritDoc}
727      * @throws NullPointerException          {@inheritDoc}
728      * @throws IllegalArgumentException      {@inheritDoc}
729      */
730     public int drainTo(Collection<? super E> c) {
731         return drainTo(c, Integer.MAX_VALUE);
732     }
733
734     /**
735      * @throws UnsupportedOperationException {@inheritDoc}
736      * @throws ClassCastException            {@inheritDoc}
737      * @throws NullPointerException          {@inheritDoc}
738      * @throws IllegalArgumentException      {@inheritDoc}
739      */
740     public int drainTo(Collection<? super E> c, int maxElements) {
741         if (c == null)
742             throw new NullPointerException();
743         if (c == this)
744             throw new IllegalArgumentException();
745         final ReentrantLock lock = this.lock;
746         lock.lock();
747         try {
748             int n = Math.min(maxElements, count);
749             for (int i = 0; i < n; i++) {
750                 c.add(first.item);   // In this order, in case add() throws.
751                 unlinkFirst();
752             }
753             return n;
754         } finally {
755             lock.unlock();
756         }
757     }
758
759     // Stack methods
760
761     /**
762      * @throws IllegalStateException {@inheritDoc}
763      * @throws NullPointerException  {@inheritDoc}
764      */
765     public void push(E e) {
766         addFirst(e);
767     }
768
769     /**
770      * @throws NoSuchElementException {@inheritDoc}
771      */
772     public E pop() {
773         return removeFirst();
774     }
775
776     // Collection methods
777
778     /**
779      * Removes the first occurrence of the specified element from this deque.
780      * If the deque does not contain the element, it is unchanged.
781      * More formally, removes the first element {@code e} such that
782      * {@code o.equals(e)} (if such an element exists).
783      * Returns {@code true} if this deque contained the specified element
784      * (or equivalently, if this deque changed as a result of the call).
785      *
786      * <p>This method is equivalent to
787      * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
788      *
789      * @param o element to be removed from this deque, if present
790      * @return {@code true} if this deque changed as a result of the call
791      */
792     public boolean remove(Object o) {
793         return removeFirstOccurrence(o);
794     }
795
796     /**
797      * Returns the number of elements in this deque.
798      *
799      * @return the number of elements in this deque
800      */
801     public int size() {
802         final ReentrantLock lock = this.lock;
803         lock.lock();
804         try {
805             return count;
806         } finally {
807             lock.unlock();
808         }
809     }
810
811     /**
812      * Returns {@code true} if this deque contains the specified element.
813      * More formally, returns {@code true} if and only if this deque contains
814      * at least one element {@code e} such that {@code o.equals(e)}.
815      *
816      * @param o object to be checked for containment in this deque
817      * @return {@code true} if this deque contains the specified element
818      */
819     public boolean contains(Object o) {
820         if (o == null) return false;
821         final ReentrantLock lock = this.lock;
822         lock.lock();
823         try {
824             for (Node<E> p = first; p != null; p = p.next)
825                 if (o.equals(p.item))
826                     return true;
827             return false;
828         } finally {
829             lock.unlock();
830         }
831     }
832
833     /*
834      * TODO: Add support for more efficient bulk operations.
835      *
836      * We don't want to acquire the lock for every iteration, but we
837      * also want other threads a chance to interact with the
838      * collection, especially when count is close to capacity.
839      */
840
841 //     /**
842 //      * Adds all of the elements in the specified collection to this
843 //      * queue.  Attempts to addAll of a queue to itself result in
844 //      * {@code IllegalArgumentException}. Further, the behavior of
845 //      * this operation is undefined if the specified collection is
846 //      * modified while the operation is in progress.
847 //      *
848 //      * @param c collection containing elements to be added to this queue
849 //      * @return {@code true} if this queue changed as a result of the call
850 //      * @throws ClassCastException            {@inheritDoc}
851 //      * @throws NullPointerException          {@inheritDoc}
852 //      * @throws IllegalArgumentException      {@inheritDoc}
853 //      * @throws IllegalStateException         {@inheritDoc}
854 //      * @see #add(Object)
855 //      */
856 //     public boolean addAll(Collection<? extends E> c) {
857 //         if (c == null)
858 //             throw new NullPointerException();
859 //         if (c == this)
860 //             throw new IllegalArgumentException();
861 //         final ReentrantLock lock = this.lock;
862 //         lock.lock();
863 //         try {
864 //             boolean modified = false;
865 //             for (E e : c)
866 //                 if (linkLast(e))
867 //                     modified = true;
868 //             return modified;
869 //         } finally {
870 //             lock.unlock();
871 //         }
872 //     }
873
874     /**
875      * Returns an array containing all of the elements in this deque, in
876      * proper sequence (from first to last element).
877      *
878      * <p>The returned array will be "safe" in that no references to it are
879      * maintained by this deque.  (In other words, this method must allocate
880      * a new array).  The caller is thus free to modify the returned array.
881      *
882      * <p>This method acts as bridge between array-based and collection-based
883      * APIs.
884      *
885      * @return an array containing all of the elements in this deque
886      */
887     @SuppressWarnings("unchecked")
888     public Object[] toArray() {
889         final ReentrantLock lock = this.lock;
890         lock.lock();
891         try {
892             Object[] a = new Object[count];
893             int k = 0;
894             for (Node<E> p = first; p != null; p = p.next)
895                 a[k++] = p.item;
896             return a;
897         } finally {
898             lock.unlock();
899         }
900     }
901
902     /**
903      * Returns an array containing all of the elements in this deque, in
904      * proper sequence; the runtime type of the returned array is that of
905      * the specified array.  If the deque fits in the specified array, it
906      * is returned therein.  Otherwise, a new array is allocated with the
907      * runtime type of the specified array and the size of this deque.
908      *
909      * <p>If this deque fits in the specified array with room to spare
910      * (i.e., the array has more elements than this deque), the element in
911      * the array immediately following the end of the deque is set to
912      * {@code null}.
913      *
914      * <p>Like the {@link #toArray()} method, this method acts as bridge between
915      * array-based and collection-based APIs.  Further, this method allows
916      * precise control over the runtime type of the output array, and may,
917      * under certain circumstances, be used to save allocation costs.
918      *
919      * <p>Suppose {@code x} is a deque known to contain only strings.
920      * The following code can be used to dump the deque into a newly
921      * allocated array of {@code String}:
922      *
923      * <pre>
924      *     String[] y = x.toArray(new String[0]);</pre>
925      *
926      * Note that {@code toArray(new Object[0])} is identical in function to
927      * {@code toArray()}.
928      *
929      * @param a the array into which the elements of the deque are to
930      *          be stored, if it is big enough; otherwise, a new array of the
931      *          same runtime type is allocated for this purpose
932      * @return an array containing all of the elements in this deque
933      * @throws ArrayStoreException if the runtime type of the specified array
934      *         is not a supertype of the runtime type of every element in
935      *         this deque
936      * @throws NullPointerException if the specified array is null
937      */
938     @SuppressWarnings("unchecked")
939     public <T> T[] toArray(T[] a) {
940         final ReentrantLock lock = this.lock;
941         lock.lock();
942         try {
943             if (a.length < count)
944                 a = (T[])java.lang.reflect.Array.newInstance
945                     (a.getClass().getComponentType(), count);
946
947             int k = 0;
948             for (Node<E> p = first; p != null; p = p.next)
949                 a[k++] = (T)p.item;
950             if (a.length > k)
951                 a[k] = null;
952             return a;
953         } finally {
954             lock.unlock();
955         }
956     }
957
958     public String toString() {
959         final ReentrantLock lock = this.lock;
960         lock.lock();
961         try {
962             Node<E> p = first;
963             if (p == null)
964                 return "[]";
965
966             StringBuilder sb = new StringBuilder();
967             sb.append('[');
968             for (;;) {
969                 E e = p.item;
970                 sb.append(e == this ? "(this Collection)" : e);
971                 p = p.next;
972                 if (p == null)
973                     return sb.append(']').toString();
974                 sb.append(',').append(' ');
975             }
976         } finally {
977             lock.unlock();
978         }
979     }
980
981     /**
982      * Atomically removes all of the elements from this deque.
983      * The deque will be empty after this call returns.
984      */
985     public void clear() {
986         final ReentrantLock lock = this.lock;
987         lock.lock();
988         try {
989             for (Node<E> f = first; f != null; ) {
990                 f.item = null;
991                 Node<E> n = f.next;
992                 f.prev = null;
993                 f.next = null;
994                 f = n;
995             }
996             first = last = null;
997             count = 0;
998             notFull.signalAll();
999         } finally {
1000             lock.unlock();
1001         }
1002     }
1003
1004     /**
1005      * Returns an iterator over the elements in this deque in proper sequence.
1006      * The elements will be returned in order from first (head) to last (tail).
1007      *
1008      * <p>The returned iterator is a "weakly consistent" iterator that
1009      * will never throw {@link java.util.ConcurrentModificationException
1010      * ConcurrentModificationException}, and guarantees to traverse
1011      * elements as they existed upon construction of the iterator, and
1012      * may (but is not guaranteed to) reflect any modifications
1013      * subsequent to construction.
1014      *
1015      * @return an iterator over the elements in this deque in proper sequence
1016      */
1017     public Iterator<E> iterator() {
1018         return new Itr();
1019     }
1020
1021     /**
1022      * Returns an iterator over the elements in this deque in reverse
1023      * sequential order.  The elements will be returned in order from
1024      * last (tail) to first (head).
1025      *
1026      * <p>The returned iterator is a "weakly consistent" iterator that
1027      * will never throw {@link java.util.ConcurrentModificationException
1028      * ConcurrentModificationException}, and guarantees to traverse
1029      * elements as they existed upon construction of the iterator, and
1030      * may (but is not guaranteed to) reflect any modifications
1031      * subsequent to construction.
1032      *
1033      * @return an iterator over the elements in this deque in reverse order
1034      */
1035     public Iterator<E> descendingIterator() {
1036         return new DescendingItr();
1037     }
1038
1039     /**
1040      * Base class for Iterators for LinkedBlockingDeque
1041      */
1042     private abstract class AbstractItr implements Iterator<E> {
1043         /**
1044          * The next node to return in next()
1045          */
1046          Node<E> next;
1047
1048         /**
1049          * nextItem holds on to item fields because once we claim that
1050          * an element exists in hasNext(), we must return item read
1051          * under lock (in advance()) even if it was in the process of
1052          * being removed when hasNext() was called.
1053          */
1054         E nextItem;
1055
1056         /**
1057          * Node returned by most recent call to next. Needed by remove.
1058          * Reset to null if this element is deleted by a call to remove.
1059          */
1060         private Node<E> lastRet;
1061
1062         abstract Node<E> firstNode();
1063         abstract Node<E> nextNode(Node<E> n);
1064
1065         AbstractItr() {
1066             // set to initial position
1067             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1068             lock.lock();
1069             try {
1070                 next = firstNode();
1071                 nextItem = (next == null) ? null : next.item;
1072             } finally {
1073                 lock.unlock();
1074             }
1075         }
1076
1077         /**
1078          * Returns the successor node of the given non-null, but
1079          * possibly previously deleted, node.
1080          */
1081         private Node<E> succ(Node<E> n) {
1082             // Chains of deleted nodes ending in null or self-links
1083             // are possible if multiple interior nodes are removed.
1084             for (;;) {
1085                 Node<E> s = nextNode(n);
1086                 if (s == null)
1087                     return null;
1088                 else if (s.item != null)
1089                     return s;
1090                 else if (s == n)
1091                     return firstNode();
1092                 else
1093                     n = s;
1094             }
1095         }
1096
1097         /**
1098          * Advances next.
1099          */
1100         void advance() {
1101             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1102             lock.lock();
1103             try {
1104                 // assert next != null;
1105                 next = succ(next);
1106                 nextItem = (next == null) ? null : next.item;
1107             } finally {
1108                 lock.unlock();
1109             }
1110         }
1111
1112         public boolean hasNext() {
1113             return next != null;
1114         }
1115
1116         public E next() {
1117             if (next == null)
1118                 throw new NoSuchElementException();
1119             lastRet = next;
1120             E x = nextItem;
1121             advance();
1122             return x;
1123         }
1124
1125         public void remove() {
1126             Node<E> n = lastRet;
1127             if (n == null)
1128                 throw new IllegalStateException();
1129             lastRet = null;
1130             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1131             lock.lock();
1132             try {
1133                 if (n.item != null)
1134                     unlink(n);
1135             } finally {
1136                 lock.unlock();
1137             }
1138         }
1139     }
1140
1141     /** Forward iterator */
1142     private class Itr extends AbstractItr {
1143         Node<E> firstNode() { return first; }
1144         Node<E> nextNode(Node<E> n) { return n.next; }
1145     }
1146
1147     /** Descending iterator */
1148     private class DescendingItr extends AbstractItr {
1149         Node<E> firstNode() { return last; }
1150         Node<E> nextNode(Node<E> n) { return n.prev; }
1151     }
1152
1153     /**
1154      * Save the state of this deque to a stream (that is, serialize it).
1155      *
1156      * @serialData The capacity (int), followed by elements (each an
1157      * {@code Object}) in the proper order, followed by a null
1158      * @param s the stream
1159      */
1160     private void writeObject(java.io.ObjectOutputStream s)
1161         throws java.io.IOException {
1162         final ReentrantLock lock = this.lock;
1163         lock.lock();
1164         try {
1165             // Write out capacity and any hidden stuff
1166             s.defaultWriteObject();
1167             // Write out all elements in the proper order.
1168             for (Node<E> p = first; p != null; p = p.next)
1169                 s.writeObject(p.item);
1170             // Use trailing null as sentinel
1171             s.writeObject(null);
1172         } finally {
1173             lock.unlock();
1174         }
1175     }
1176
1177     /**
1178      * Reconstitute this deque from a stream (that is,
1179      * deserialize it).
1180      * @param s the stream
1181      */
1182     private void readObject(java.io.ObjectInputStream s)
1183         throws java.io.IOException, ClassNotFoundException {
1184         s.defaultReadObject();
1185         count = 0;
1186         first = null;
1187         last = null;
1188         // Read in all elements and place in queue
1189         for (;;) {
1190             @SuppressWarnings("unchecked")
1191             E item = (E)s.readObject();
1192             if (item == null)
1193                 break;
1194             add(item);
1195         }
1196     }
1197
1198 }


下面从ArrayBlockingQueue的创建,添加,取出,遍历这几个方面对LinkedBlockingDeque进行分析

1. 创建

下面以LinkedBlockingDeque(int capacity)来进行说明。

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}


说明:capacity是“链式阻塞队列”的容量。

LinkedBlockingDeque中相关的数据结果定义如下:

// “双向队列”的表头
transient Node<E> first;
// “双向队列”的表尾
transient Node<E> last;
// 节点数量
private transient int count;
// 容量
private final int capacity;
// 互斥锁 , 互斥锁对应的“非空条件notEmpty”, 互斥锁对应的“未满条件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();


说明:lock是互斥锁,用于控制多线程对LinkedBlockingDeque中元素的互斥访问;而notEmpty和notFull是与lock绑定的条件,它们用于实现对多线程更精确的控制。

双向链表的节点Node的定义如下:

static final class Node<E> {
E item;       // 数据
Node<E> prev; // 前一节点
Node<E> next; // 后一节点

Node(E x) { item = x; }
}


2. 添加

下面以offer(E e)为例,对LinkedBlockingDeque的添加方法进行说明。

public boolean offer(E e) {
return offerLast(e);
}


offer()实际上是调用offerLast()将元素添加到队列的末尾。

offerLast()的源码如下:

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
// 新建节点
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 将“新节点”添加到双向链表的末尾
return linkLast(node);
} finally {
// 释放锁
lock.unlock();
}
}


说明:offerLast()的作用,是新建节点并将该节点插入到双向链表的末尾。它在插入节点前,会获取锁;操作完毕,再释放锁。

linkLast()的源码如下:

private boolean linkLast(Node<E> node) {
// 如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败。
if (count >= capacity)
return false;
// 将“node添加到链表末尾”,并设置node为新的尾节点
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
// 将“节点数量”+1
++count;
// 插入节点之后,唤醒notEmpty上的等待线程。
notEmpty.signal();
return true;
}


说明:linkLast()的作用,是将节点插入到双向队列的末尾;插入节点之后,唤醒notEmpty上的等待线程。

3. 删除

下面以take()为例,对LinkedBlockingDeque的取出方法进行说明。

public E take() throws InterruptedException {
return takeFirst();
}


take()实际上是调用takeFirst()队列的第一个元素。

takeFirst()的源码如下:

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
E x;
// 若“队列为空”,则一直等待。否则,通过unlinkFirst()删除第一个节点。
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
// 释放锁
lock.unlock();
}
}


说明:takeFirst()的作用,是删除双向链表的第一个节点,并返回节点对应的值。它在插入节点前,会获取锁;操作完毕,再释放锁。

unlinkFirst()的源码如下:

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
// 删除并更新“第一个节点”
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
// 将“节点数量”-1
--count;
// 删除节点之后,唤醒notFull上的等待线程。
notFull.signal();
return item;
}


说明:unlinkFirst()的作用,是将双向队列的第一个节点删除;删除节点之后,唤醒notFull上的等待线程。

4. 遍历

下面对LinkedBlockingDeque的遍历方法进行说明。

public Iterator<E> iterator() {
return new Itr();
}


iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

private class Itr extends AbstractItr {
// “双向队列”的表头
Node<E> firstNode() { return first; }
// 获取“节点n的下一个节点”
Node<E> nextNode(Node<E> n) { return n.next; }
}


Itr继承于AbstractItr,而AbstractItr的定义如下:

private abstract class AbstractItr implements Iterator<E> {
// next是下一次调用next()会返回的节点。
Node<E> next;
// nextItem是next()返回节点对应的数据。
E nextItem;
// 上一次next()返回的节点。
private Node<E> lastRet;
// 返回第一个节点
abstract Node<E> firstNode();
// 返回下一个节点
abstract Node<E> nextNode(Node<E> n);

AbstractItr() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
// 获取“LinkedBlockingDeque的互斥锁”
lock.lock();
try {
// 获取“双向队列”的表头
next = firstNode();
// 获取表头对应的数据
nextItem = (next == null) ? null : next.item;
} finally {
// 释放“LinkedBlockingDeque的互斥锁”
lock.unlock();
}
}

// 获取n的后继节点
private Node<E> succ(Node<E> n) {
// Chains of deleted nodes ending in null or self-links
// are possible if multiple interior nodes are removed.
for (;;) {
Node<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n)
return firstNode();
else
n = s;
}
}

// 更新next和nextItem。
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}

// 返回“下一个节点是否为null”
public boolean hasNext() {
return next != null;
}

// 返回下一个节点
public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet = next;
E x = nextItem;
advance();
return x;
}

// 删除下一个节点
public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)
unlink(n);
} finally {
lock.unlock();
}
}
}



LinkedBlockingDeque示例

1 import java.util.*;
2 import java.util.concurrent.*;
3
4 /*
5  *   LinkedBlockingDeque是“线程安全”的队列,而LinkedList是非线程安全的。
6  *
7  *   下面是“多个线程同时操作并且遍历queue”的示例
8  *   (01) 当queue是LinkedBlockingDeque对象时,程序能正常运行。
9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingDequeDemo1 {
14
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingDeque<String>();
18     public static void main(String[] args) {
19
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }


(某一次)运行结果

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2,
ta2,
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3,
ta3, ta1,
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4,
tb4, ta1, ta4, tb1, tb5,
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5,
ta4, ta1, tb5, tb1, ta5, tb2, tb6,
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6,
tb5, ta5, tb6, ta6,


结果说明:示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingDeque进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingDeque中;接着,遍历并输出LinkedBlockingDeque中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。

当queue是LinkedBlockingDeque对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: