diff -r cdf768813b4d -r 79309d6eab38 jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java --- a/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Tue Jul 28 13:24:52 2009 -0700 +++ b/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Tue Jul 28 17:17:55 2009 -0700 @@ -34,8 +34,13 @@ */ package java.util.concurrent; -import java.util.*; -import java.util.concurrent.locks.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on @@ -73,6 +78,20 @@ /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to jump to "first" (for next links) + * or "last" (for prev links). */ /* @@ -86,9 +105,27 @@ /** Doubly-linked list node class */ static final class Node { + /** + * The item, or null if this node has been removed. + */ E item; + + /** + * One of: + * - the real predecessor Node + * - this Node, meaning the predecessor is tail + * - null, meaning there is no predecessor + */ Node prev; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head + * - null, meaning there is no successor + */ Node next; + Node(E x, Node p, Node n) { item = x; prev = p; @@ -96,23 +133,37 @@ } } - /** Pointer to first node */ - private transient Node first; - /** Pointer to last node */ - private transient Node last; + /** + * Pointer to first node. + * Invariant: (first == null && last == null) || + * (first.prev == null && first.item != null) + */ + transient Node first; + + /** + * Pointer to last node. + * Invariant: (first == null && last == null) || + * (last.next == null && last.item != null) + */ + transient Node last; + /** Number of items in the deque */ private transient int count; + /** Maximum number of items in the deque */ private final int capacity; + /** Main lock guarding all access */ - private final ReentrantLock lock = new ReentrantLock(); + final ReentrantLock lock = new ReentrantLock(); + /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); + /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** - * Creates a LinkedBlockingDeque with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { @@ -120,10 +171,10 @@ } /** - * Creates a LinkedBlockingDeque with the given (fixed) capacity. + * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque - * @throws IllegalArgumentException if capacity is less than 1 + * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); @@ -131,7 +182,7 @@ } /** - * Creates a LinkedBlockingDeque with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of * the given collection, added in traversal order of the * collection's iterator. @@ -142,8 +193,18 @@ */ public LinkedBlockingDeque(Collection c) { this(Integer.MAX_VALUE); - for (E e : c) - add(e); + final ReentrantLock lock = this.lock; + lock.lock(); // Never contended, but necessary for visibility + try { + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (!linkLast(e)) + throw new IllegalStateException("Deque full"); + } + } finally { + lock.unlock(); + } } @@ -153,9 +214,9 @@ * Links e as first element, or returns false if full. */ private boolean linkFirst(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node f = first; Node x = new Node(e, null, f); first = x; @@ -163,6 +224,7 @@ last = x; else f.prev = x; + ++count; notEmpty.signal(); return true; } @@ -171,9 +233,9 @@ * Links e as last element, or returns false if full. */ private boolean linkLast(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node l = last; Node x = new Node(e, l, null); last = x; @@ -181,6 +243,7 @@ first = x; else l.next = x; + ++count; notEmpty.signal(); return true; } @@ -189,10 +252,14 @@ * Removes and returns first element, or null if empty. */ private E unlinkFirst() { + // assert lock.isHeldByCurrentThread(); Node f = first; if (f == null) return null; Node n = f.next; + E item = f.item; + f.item = null; + f.next = f; // help GC first = n; if (n == null) last = null; @@ -200,17 +267,21 @@ n.prev = null; --count; notFull.signal(); - return f.item; + return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { + // assert lock.isHeldByCurrentThread(); Node l = last; if (l == null) return null; Node p = l.prev; + E item = l.item; + l.item = null; + l.prev = l; // help GC last = p; if (p == null) first = null; @@ -218,31 +289,29 @@ p.next = null; --count; notFull.signal(); - return l.item; + return item; } /** - * Unlink e + * Unlinks x. */ - private void unlink(Node x) { + void unlink(Node x) { + // assert lock.isHeldByCurrentThread(); Node p = x.prev; Node n = x.next; if (p == null) { - if (n == null) - first = last = null; - else { - n.prev = null; - first = n; - } + unlinkFirst(); } else if (n == null) { - p.next = null; - last = p; + unlinkLast(); } else { p.next = n; n.prev = p; + x.item = null; + // Don't mess with x's links. They may still be in use by + // an iterator. + --count; + notFull.signal(); } - --count; - notFull.signalAll(); } // BlockingDeque methods @@ -270,6 +339,7 @@ */ public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(e); @@ -283,6 +353,7 @@ */ public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(e); @@ -297,6 +368,7 @@ */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(e)) @@ -312,6 +384,7 @@ */ public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(e)) @@ -329,15 +402,15 @@ throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkFirst(e)) - return true; + while (!linkFirst(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -351,15 +424,15 @@ throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkLast(e)) - return true; + while (!linkLast(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -384,6 +457,7 @@ } public E pollFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); @@ -393,6 +467,7 @@ } public E pollLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); @@ -402,6 +477,7 @@ } public E takeFirst() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -414,6 +490,7 @@ } public E takeLast() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -428,16 +505,16 @@ public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkFirst(); - if (x != null) - return x; + E x; + while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -446,16 +523,16 @@ public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkLast(); - if (x != null) - return x; + E x; + while ( (x = unlinkLast()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -480,6 +557,7 @@ } public E peekFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; @@ -489,6 +567,7 @@ } public E peekLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; @@ -499,6 +578,7 @@ public boolean removeFirstOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = first; p != null; p = p.next) { @@ -515,6 +595,7 @@ public boolean removeLastOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = last; p != null; p = p.prev) { @@ -619,14 +700,15 @@ * Returns the number of additional elements that this deque can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this deque - * less the current size of this deque. + * less the current {@code size} of this deque. * *

Note that you cannot always tell if an attempt to insert - * an element will succeed by inspecting remainingCapacity + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { + final ReentrantLock lock = this.lock; lock.lock(); try { return capacity - count; @@ -642,22 +724,7 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - lock.lock(); - try { - for (Node p = first; p != null; p = p.next) - c.add(p.item); - int n = count; - count = 0; - first = last = null; - notFull.signalAll(); - return n; - } finally { - lock.unlock(); - } + return drainTo(c, Integer.MAX_VALUE); } /** @@ -671,19 +738,14 @@ throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; lock.lock(); try { - int n = 0; - while (n < maxElements && first != null) { - c.add(first.item); - first.prev = null; - first = first.next; - --count; - ++n; + int n = Math.min(maxElements, count); + for (int i = 0; i < n; i++) { + c.add(first.item); // In this order, in case add() throws. + unlinkFirst(); } - if (first == null) - last = null; - notFull.signalAll(); return n; } finally { lock.unlock(); @@ -712,16 +774,16 @@ /** * Removes the first occurrence of the specified element from this deque. * If the deque does not contain the element, it is unchanged. - * More formally, removes the first element e such that - * o.equals(e) (if such an element exists). - * Returns true if this deque contained the specified element + * More formally, removes the first element {@code e} such that + * {@code o.equals(e)} (if such an element exists). + * Returns {@code true} if this deque contained the specified element * (or equivalently, if this deque changed as a result of the call). * *

This method is equivalent to * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}. * * @param o element to be removed from this deque, if present - * @return true if this deque changed as a result of the call + * @return {@code true} if this deque changed as a result of the call */ public boolean remove(Object o) { return removeFirstOccurrence(o); @@ -733,6 +795,7 @@ * @return the number of elements in this deque */ public int size() { + final ReentrantLock lock = this.lock; lock.lock(); try { return count; @@ -742,15 +805,16 @@ } /** - * Returns true if this deque contains the specified element. - * More formally, returns true if and only if this deque contains - * at least one element e such that o.equals(e). + * Returns {@code true} if this deque contains the specified element. + * More formally, returns {@code true} if and only if this deque contains + * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this deque - * @return true if this deque contains the specified element + * @return {@code true} if this deque contains the specified element */ public boolean contains(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = first; p != null; p = p.next) @@ -762,24 +826,46 @@ } } - /** - * Variant of removeFirstOccurrence needed by iterator.remove. - * Searches for the node, not its contents. + /* + * TODO: Add support for more efficient bulk operations. + * + * We don't want to acquire the lock for every iteration, but we + * also want other threads a chance to interact with the + * collection, especially when count is close to capacity. */ - boolean removeNode(Node e) { - lock.lock(); - try { - for (Node p = first; p != null; p = p.next) { - if (p == e) { - unlink(p); - return true; - } - } - return false; - } finally { - lock.unlock(); - } - } + +// /** +// * Adds all of the elements in the specified collection to this +// * queue. Attempts to addAll of a queue to itself result in +// * {@code IllegalArgumentException}. Further, the behavior of +// * this operation is undefined if the specified collection is +// * modified while the operation is in progress. +// * +// * @param c collection containing elements to be added to this queue +// * @return {@code true} if this queue changed as a result of the call +// * @throws ClassCastException {@inheritDoc} +// * @throws NullPointerException {@inheritDoc} +// * @throws IllegalArgumentException {@inheritDoc} +// * @throws IllegalStateException {@inheritDoc} +// * @see #add(Object) +// */ +// public boolean addAll(Collection c) { +// if (c == null) +// throw new NullPointerException(); +// if (c == this) +// throw new IllegalArgumentException(); +// final ReentrantLock lock = this.lock; +// lock.lock(); +// try { +// boolean modified = false; +// for (E e : c) +// if (linkLast(e)) +// modified = true; +// return modified; +// } finally { +// lock.unlock(); +// } +// } /** * Returns an array containing all of the elements in this deque, in @@ -794,7 +880,9 @@ * * @return an array containing all of the elements in this deque */ + @SuppressWarnings("unchecked") public Object[] toArray() { + final ReentrantLock lock = this.lock; lock.lock(); try { Object[] a = new Object[count]; @@ -817,22 +905,22 @@ *

If this deque fits in the specified array with room to spare * (i.e., the array has more elements than this deque), the element in * the array immediately following the end of the deque is set to - * null. + * {@code null}. * *

Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - *

Suppose x is a deque known to contain only strings. + *

Suppose {@code x} is a deque known to contain only strings. * The following code can be used to dump the deque into a newly - * allocated array of String: + * allocated array of {@code String}: * *

      *     String[] y = x.toArray(new String[0]);
* - * Note that toArray(new Object[0]) is identical in function to - * toArray(). + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the deque are to * be stored, if it is big enough; otherwise, a new array of the @@ -843,14 +931,14 @@ * this deque * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < count) - a = (T[])java.lang.reflect.Array.newInstance( - a.getClass().getComponentType(), - count - ); + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), count); int k = 0; for (Node p = first; p != null; p = p.next) @@ -864,6 +952,7 @@ } public String toString() { + final ReentrantLock lock = this.lock; lock.lock(); try { return super.toString(); @@ -877,8 +966,16 @@ * The deque will be empty after this call returns. */ public void clear() { + final ReentrantLock lock = this.lock; lock.lock(); try { + for (Node f = first; f != null; ) { + f.item = null; + Node n = f.next; + f.prev = null; + f.next = null; + f = n; + } first = last = null; count = 0; notFull.signalAll(); @@ -890,7 +987,7 @@ /** * Returns an iterator over the elements in this deque in proper sequence. * The elements will be returned in order from first (head) to last (tail). - * The returned Iterator is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -906,7 +1003,7 @@ * Returns an iterator over the elements in this deque in reverse * sequential order. The elements will be returned in order from * last (tail) to first (head). - * The returned Iterator is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -921,7 +1018,7 @@ */ private abstract class AbstractItr implements Iterator { /** - * The next node to return in next + * The next node to return in next() */ Node next; @@ -939,15 +1036,44 @@ */ private Node lastRet; + abstract Node firstNode(); + abstract Node nextNode(Node n); + AbstractItr() { - advance(); // set to initial position + // set to initial position + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + next = firstNode(); + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } } /** - * Advances next, or if not yet initialized, sets to first node. - * Implemented to move forward vs backward in the two subclasses. + * Advances next. */ - abstract void advance(); + void advance() { + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + // assert next != null; + Node s = nextNode(next); + if (s == next) { + next = firstNode(); + } else { + // Skip over removed nodes. + // May be necessary if multiple interior Nodes are removed. + while (s != null && s.item == null) + s = nextNode(s); + next = s; + } + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } + } public boolean hasNext() { return next != null; @@ -967,52 +1093,39 @@ if (n == null) throw new IllegalStateException(); lastRet = null; - // Note: removeNode rescans looking for this node to make - // sure it was not already removed. Otherwise, trying to - // re-remove could corrupt list. - removeNode(n); - } - } - - /** Forward iterator */ - private class Itr extends AbstractItr { - void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { - next = (next == null)? first : next.next; - nextItem = (next == null)? null : next.item; + if (n.item != null) + unlink(n); } finally { lock.unlock(); } } } - /** - * Descending iterator for LinkedBlockingDeque - */ + /** Forward iterator */ + private class Itr extends AbstractItr { + Node firstNode() { return first; } + Node nextNode(Node n) { return n.next; } + } + + /** Descending iterator */ private class DescendingItr extends AbstractItr { - void advance() { - final ReentrantLock lock = LinkedBlockingDeque.this.lock; - lock.lock(); - try { - next = (next == null)? last : next.prev; - nextItem = (next == null)? null : next.item; - } finally { - lock.unlock(); - } - } + Node firstNode() { return last; } + Node nextNode(Node n) { return n.prev; } } /** * Save the state of this deque to a stream (that is, serialize it). * * @serialData The capacity (int), followed by elements (each an - * Object) in the proper order, followed by a null + * {@code Object}) in the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + final ReentrantLock lock = this.lock; lock.lock(); try { // Write out capacity and any hidden stuff @@ -1040,6 +1153,7 @@ last = null; // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break;