--- a/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Tue Jul 28 13:24:52 2009 -0700
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Tue Jul 28 17:17:55 2009 -0700
@@ -34,8 +34,13 @@
*/
package java.util.concurrent;
-import java.util.*;
-import java.util.concurrent.locks.*;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
/**
* An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
@@ -73,6 +78,20 @@
/*
* Implemented as a simple doubly-linked list protected by a
* single lock and using conditions to manage blocking.
+ *
+ * To implement weakly consistent iterators, it appears we need to
+ * keep all Nodes GC-reachable from a predecessor dequeued Node.
+ * That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to jump to "first" (for next links)
+ * or "last" (for prev links).
*/
/*
@@ -86,9 +105,27 @@
/** Doubly-linked list node class */
static final class Node<E> {
+ /**
+ * The item, or null if this node has been removed.
+ */
E item;
+
+ /**
+ * One of:
+ * - the real predecessor Node
+ * - this Node, meaning the predecessor is tail
+ * - null, meaning there is no predecessor
+ */
Node<E> prev;
+
+ /**
+ * One of:
+ * - the real successor Node
+ * - this Node, meaning the successor is head
+ * - null, meaning there is no successor
+ */
Node<E> next;
+
Node(E x, Node<E> p, Node<E> n) {
item = x;
prev = p;
@@ -96,23 +133,37 @@
}
}
- /** Pointer to first node */
- private transient Node<E> first;
- /** Pointer to last node */
- private transient Node<E> last;
+ /**
+ * Pointer to first node.
+ * Invariant: (first == null && last == null) ||
+ * (first.prev == null && first.item != null)
+ */
+ transient Node<E> first;
+
+ /**
+ * Pointer to last node.
+ * Invariant: (first == null && last == null) ||
+ * (last.next == null && last.item != null)
+ */
+ transient Node<E> last;
+
/** Number of items in the deque */
private transient int count;
+
/** Maximum number of items in the deque */
private final int capacity;
+
/** Main lock guarding all access */
- private final ReentrantLock lock = new ReentrantLock();
+ final ReentrantLock lock = new ReentrantLock();
+
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
+
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
/**
- * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * Creates a {@code LinkedBlockingDeque} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingDeque() {
@@ -120,10 +171,10 @@
}
/**
- * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed) capacity.
+ * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
*
* @param capacity the capacity of this deque
- * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+ * @throws IllegalArgumentException if {@code capacity} is less than 1
*/
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
@@ -131,7 +182,7 @@
}
/**
- * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * Creates a {@code LinkedBlockingDeque} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of
* the given collection, added in traversal order of the
* collection's iterator.
@@ -142,8 +193,18 @@
*/
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
- for (E e : c)
- add(e);
+ final ReentrantLock lock = this.lock;
+ lock.lock(); // Never contended, but necessary for visibility
+ try {
+ for (E e : c) {
+ if (e == null)
+ throw new NullPointerException();
+ if (!linkLast(e))
+ throw new IllegalStateException("Deque full");
+ }
+ } finally {
+ lock.unlock();
+ }
}
@@ -153,9 +214,9 @@
* Links e as first element, or returns false if full.
*/
private boolean linkFirst(E e) {
+ // assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
- ++count;
Node<E> f = first;
Node<E> x = new Node<E>(e, null, f);
first = x;
@@ -163,6 +224,7 @@
last = x;
else
f.prev = x;
+ ++count;
notEmpty.signal();
return true;
}
@@ -171,9 +233,9 @@
* Links e as last element, or returns false if full.
*/
private boolean linkLast(E e) {
+ // assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
- ++count;
Node<E> l = last;
Node<E> x = new Node<E>(e, l, null);
last = x;
@@ -181,6 +243,7 @@
first = x;
else
l.next = x;
+ ++count;
notEmpty.signal();
return true;
}
@@ -189,10 +252,14 @@
* Removes and returns first element, or null if empty.
*/
private E unlinkFirst() {
+ // assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
+ E item = f.item;
+ f.item = null;
+ f.next = f; // help GC
first = n;
if (n == null)
last = null;
@@ -200,17 +267,21 @@
n.prev = null;
--count;
notFull.signal();
- return f.item;
+ return item;
}
/**
* Removes and returns last element, or null if empty.
*/
private E unlinkLast() {
+ // assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
+ E item = l.item;
+ l.item = null;
+ l.prev = l; // help GC
last = p;
if (p == null)
first = null;
@@ -218,31 +289,29 @@
p.next = null;
--count;
notFull.signal();
- return l.item;
+ return item;
}
/**
- * Unlink e
+ * Unlinks x.
*/
- private void unlink(Node<E> x) {
+ void unlink(Node<E> x) {
+ // assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) {
- if (n == null)
- first = last = null;
- else {
- n.prev = null;
- first = n;
- }
+ unlinkFirst();
} else if (n == null) {
- p.next = null;
- last = p;
+ unlinkLast();
} else {
p.next = n;
n.prev = p;
+ x.item = null;
+ // Don't mess with x's links. They may still be in use by
+ // an iterator.
+ --count;
+ notFull.signal();
}
- --count;
- notFull.signalAll();
}
// BlockingDeque methods
@@ -270,6 +339,7 @@
*/
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(e);
@@ -283,6 +353,7 @@
*/
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(e);
@@ -297,6 +368,7 @@
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(e))
@@ -312,6 +384,7 @@
*/
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(e))
@@ -329,15 +402,15 @@
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (linkFirst(e))
- return true;
+ while (!linkFirst(e)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
+ return true;
} finally {
lock.unlock();
}
@@ -351,15 +424,15 @@
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (linkLast(e))
- return true;
+ while (!linkLast(e)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
+ return true;
} finally {
lock.unlock();
}
@@ -384,6 +457,7 @@
}
public E pollFirst() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
@@ -393,6 +467,7 @@
}
public E pollLast() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
@@ -402,6 +477,7 @@
}
public E takeFirst() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
@@ -414,6 +490,7 @@
}
public E takeLast() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
@@ -428,16 +505,16 @@
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- E x = unlinkFirst();
- if (x != null)
- return x;
+ E x;
+ while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
+ return x;
} finally {
lock.unlock();
}
@@ -446,16 +523,16 @@
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- E x = unlinkLast();
- if (x != null)
- return x;
+ E x;
+ while ( (x = unlinkLast()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
+ return x;
} finally {
lock.unlock();
}
@@ -480,6 +557,7 @@
}
public E peekFirst() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
@@ -489,6 +567,7 @@
}
public E peekLast() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return (last == null) ? null : last.item;
@@ -499,6 +578,7 @@
public boolean removeFirstOccurrence(Object o) {
if (o == null) return false;
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next) {
@@ -515,6 +595,7 @@
public boolean removeLastOccurrence(Object o) {
if (o == null) return false;
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = last; p != null; p = p.prev) {
@@ -619,14 +700,15 @@
* Returns the number of additional elements that this deque can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this deque
- * less the current <tt>size</tt> of this deque.
+ * less the current {@code size} of this deque.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return capacity - count;
@@ -642,22 +724,7 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- lock.lock();
- try {
- for (Node<E> p = first; p != null; p = p.next)
- c.add(p.item);
- int n = count;
- count = 0;
- first = last = null;
- notFull.signalAll();
- return n;
- } finally {
- lock.unlock();
- }
+ return drainTo(c, Integer.MAX_VALUE);
}
/**
@@ -671,19 +738,14 @@
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
- int n = 0;
- while (n < maxElements && first != null) {
- c.add(first.item);
- first.prev = null;
- first = first.next;
- --count;
- ++n;
+ int n = Math.min(maxElements, count);
+ for (int i = 0; i < n; i++) {
+ c.add(first.item); // In this order, in case add() throws.
+ unlinkFirst();
}
- if (first == null)
- last = null;
- notFull.signalAll();
return n;
} finally {
lock.unlock();
@@ -712,16 +774,16 @@
/**
* Removes the first occurrence of the specified element from this deque.
* If the deque does not contain the element, it is unchanged.
- * More formally, removes the first element <tt>e</tt> such that
- * <tt>o.equals(e)</tt> (if such an element exists).
- * Returns <tt>true</tt> if this deque contained the specified element
+ * More formally, removes the first element {@code e} such that
+ * {@code o.equals(e)} (if such an element exists).
+ * Returns {@code true} if this deque contained the specified element
* (or equivalently, if this deque changed as a result of the call).
*
* <p>This method is equivalent to
* {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
*
* @param o element to be removed from this deque, if present
- * @return <tt>true</tt> if this deque changed as a result of the call
+ * @return {@code true} if this deque changed as a result of the call
*/
public boolean remove(Object o) {
return removeFirstOccurrence(o);
@@ -733,6 +795,7 @@
* @return the number of elements in this deque
*/
public int size() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
@@ -742,15 +805,16 @@
}
/**
- * Returns <tt>true</tt> if this deque contains the specified element.
- * More formally, returns <tt>true</tt> if and only if this deque contains
- * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
+ * Returns {@code true} if this deque contains the specified element.
+ * More formally, returns {@code true} if and only if this deque contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this deque
- * @return <tt>true</tt> if this deque contains the specified element
+ * @return {@code true} if this deque contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next)
@@ -762,24 +826,46 @@
}
}
- /**
- * Variant of removeFirstOccurrence needed by iterator.remove.
- * Searches for the node, not its contents.
+ /*
+ * TODO: Add support for more efficient bulk operations.
+ *
+ * We don't want to acquire the lock for every iteration, but we
+ * also want other threads a chance to interact with the
+ * collection, especially when count is close to capacity.
*/
- boolean removeNode(Node<E> e) {
- lock.lock();
- try {
- for (Node<E> p = first; p != null; p = p.next) {
- if (p == e) {
- unlink(p);
- return true;
- }
- }
- return false;
- } finally {
- lock.unlock();
- }
- }
+
+// /**
+// * Adds all of the elements in the specified collection to this
+// * queue. Attempts to addAll of a queue to itself result in
+// * {@code IllegalArgumentException}. Further, the behavior of
+// * this operation is undefined if the specified collection is
+// * modified while the operation is in progress.
+// *
+// * @param c collection containing elements to be added to this queue
+// * @return {@code true} if this queue changed as a result of the call
+// * @throws ClassCastException {@inheritDoc}
+// * @throws NullPointerException {@inheritDoc}
+// * @throws IllegalArgumentException {@inheritDoc}
+// * @throws IllegalStateException {@inheritDoc}
+// * @see #add(Object)
+// */
+// public boolean addAll(Collection<? extends E> c) {
+// if (c == null)
+// throw new NullPointerException();
+// if (c == this)
+// throw new IllegalArgumentException();
+// final ReentrantLock lock = this.lock;
+// lock.lock();
+// try {
+// boolean modified = false;
+// for (E e : c)
+// if (linkLast(e))
+// modified = true;
+// return modified;
+// } finally {
+// lock.unlock();
+// }
+// }
/**
* Returns an array containing all of the elements in this deque, in
@@ -794,7 +880,9 @@
*
* @return an array containing all of the elements in this deque
*/
+ @SuppressWarnings("unchecked")
public Object[] toArray() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] a = new Object[count];
@@ -817,22 +905,22 @@
* <p>If this deque fits in the specified array with room to spare
* (i.e., the array has more elements than this deque), the element in
* the array immediately following the end of the deque is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- * <p>Suppose <tt>x</tt> is a deque known to contain only strings.
+ * <p>Suppose {@code x} is a deque known to contain only strings.
* The following code can be used to dump the deque into a newly
- * allocated array of <tt>String</tt>:
+ * allocated array of {@code String}:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the deque are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -843,14 +931,14 @@
* this deque
* @throws NullPointerException if the specified array is null
*/
+ @SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
if (a.length < count)
- a = (T[])java.lang.reflect.Array.newInstance(
- a.getClass().getComponentType(),
- count
- );
+ a = (T[])java.lang.reflect.Array.newInstance
+ (a.getClass().getComponentType(), count);
int k = 0;
for (Node<E> p = first; p != null; p = p.next)
@@ -864,6 +952,7 @@
}
public String toString() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
return super.toString();
@@ -877,8 +966,16 @@
* The deque will be empty after this call returns.
*/
public void clear() {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
+ for (Node<E> f = first; f != null; ) {
+ f.item = null;
+ Node<E> n = f.next;
+ f.prev = null;
+ f.next = null;
+ f = n;
+ }
first = last = null;
count = 0;
notFull.signalAll();
@@ -890,7 +987,7 @@
/**
* Returns an iterator over the elements in this deque in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
* will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
@@ -906,7 +1003,7 @@
* Returns an iterator over the elements in this deque in reverse
* sequential order. The elements will be returned in order from
* last (tail) to first (head).
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
* will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
@@ -921,7 +1018,7 @@
*/
private abstract class AbstractItr implements Iterator<E> {
/**
- * The next node to return in next
+ * The next node to return in next()
*/
Node<E> next;
@@ -939,15 +1036,44 @@
*/
private Node<E> lastRet;
+ abstract Node<E> firstNode();
+ abstract Node<E> nextNode(Node<E> n);
+
AbstractItr() {
- advance(); // set to initial position
+ // set to initial position
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ next = firstNode();
+ nextItem = (next == null) ? null : next.item;
+ } finally {
+ lock.unlock();
+ }
}
/**
- * Advances next, or if not yet initialized, sets to first node.
- * Implemented to move forward vs backward in the two subclasses.
+ * Advances next.
*/
- abstract void advance();
+ void advance() {
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ // assert next != null;
+ Node<E> s = nextNode(next);
+ if (s == next) {
+ next = firstNode();
+ } else {
+ // Skip over removed nodes.
+ // May be necessary if multiple interior Nodes are removed.
+ while (s != null && s.item == null)
+ s = nextNode(s);
+ next = s;
+ }
+ nextItem = (next == null) ? null : next.item;
+ } finally {
+ lock.unlock();
+ }
+ }
public boolean hasNext() {
return next != null;
@@ -967,52 +1093,39 @@
if (n == null)
throw new IllegalStateException();
lastRet = null;
- // Note: removeNode rescans looking for this node to make
- // sure it was not already removed. Otherwise, trying to
- // re-remove could corrupt list.
- removeNode(n);
- }
- }
-
- /** Forward iterator */
- private class Itr extends AbstractItr {
- void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
- next = (next == null)? first : next.next;
- nextItem = (next == null)? null : next.item;
+ if (n.item != null)
+ unlink(n);
} finally {
lock.unlock();
}
}
}
- /**
- * Descending iterator for LinkedBlockingDeque
- */
+ /** Forward iterator */
+ private class Itr extends AbstractItr {
+ Node<E> firstNode() { return first; }
+ Node<E> nextNode(Node<E> n) { return n.next; }
+ }
+
+ /** Descending iterator */
private class DescendingItr extends AbstractItr {
- void advance() {
- final ReentrantLock lock = LinkedBlockingDeque.this.lock;
- lock.lock();
- try {
- next = (next == null)? last : next.prev;
- nextItem = (next == null)? null : next.item;
- } finally {
- lock.unlock();
- }
- }
+ Node<E> firstNode() { return last; }
+ Node<E> nextNode(Node<E> n) { return n.prev; }
}
/**
* Save the state of this deque to a stream (that is, serialize it).
*
* @serialData The capacity (int), followed by elements (each an
- * <tt>Object</tt>) in the proper order, followed by a null
+ * {@code Object}) in the proper order, followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
+ final ReentrantLock lock = this.lock;
lock.lock();
try {
// Write out capacity and any hidden stuff
@@ -1040,6 +1153,7 @@
last = null;
// Read in all elements and place in queue
for (;;) {
+ @SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break;