7005424: Resync java.util.concurrent classes with Dougs CVS - Jan 2011
Reviewed-by: dholmes, chegar, mduigou
--- a/jdk/src/share/classes/java/util/Collections.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/Collections.java Wed Jan 12 14:40:36 2011 +0000
@@ -1452,10 +1452,10 @@
* when o is a Map.Entry, and calls o.setValue.
*/
public boolean containsAll(Collection<?> coll) {
- Iterator<?> it = coll.iterator();
- while (it.hasNext())
- if (!contains(it.next())) // Invokes safe contains() above
+ for (Object e : coll) {
+ if (!contains(e)) // Invokes safe contains() above
return false;
+ }
return true;
}
public boolean equals(Object o) {
--- a/jdk/src/share/classes/java/util/LinkedList.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/LinkedList.java Wed Jan 12 14:40:36 2011 +0000
@@ -26,9 +26,9 @@
package java.util;
/**
- * Linked list implementation of the {@link List} and {@link Deque} interfaces.
- * Implements all optional operations, and permits all elements (including
- * {@code null}).
+ * Doubly-linked list implementation of the {@code List} and {@code Deque}
+ * interfaces. Implements all optional list operations, and permits all
+ * elements (including {@code null}).
*
* <p>All of the operations perform as could be expected for a doubly-linked
* list. Operations that index into the list will traverse the list from
@@ -249,7 +249,7 @@
* @return the last element in this list
* @throws NoSuchElementException if this list is empty
*/
- public E getLast() {
+ public E getLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
--- a/jdk/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -49,14 +49,14 @@
* <p>This is a classic "bounded buffer", in which a
* fixed-sized array holds elements inserted by producers and
* extracted by consumers. Once created, the capacity cannot be
- * increased. Attempts to <tt>put</tt> an element into a full queue
- * will result in the operation blocking; attempts to <tt>take</tt> an
+ * changed. Attempts to {@code put} an element into a full queue
+ * will result in the operation blocking; attempts to {@code take} an
* element from an empty queue will similarly block.
*
- * <p> This class supports an optional fairness policy for ordering
+ * <p>This class supports an optional fairness policy for ordering
* waiting producer and consumer threads. By default, this ordering
* is not guaranteed. However, a queue constructed with fairness set
- * to <tt>true</tt> grants threads access in FIFO order. Fairness
+ * to {@code true} grants threads access in FIFO order. Fairness
* generally decreases throughput but reduces variability and avoids
* starvation.
*
@@ -83,14 +83,17 @@
*/
private static final long serialVersionUID = -817911632652898426L;
- /** The queued items */
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- /** Number of items in the queue */
- private int count;
+ /** The queued items */
+ final Object[] items;
+
+ /** items index for next take, poll, peek or remove */
+ int takeIndex;
+
+ /** items index for next put, offer, or add */
+ int putIndex;
+
+ /** Number of elements in the queue */
+ int count;
/*
* Concurrency control uses the classic two-condition algorithm
@@ -98,7 +101,7 @@
*/
/** Main lock guarding all access */
- private final ReentrantLock lock;
+ final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
@@ -110,7 +113,36 @@
* Circularly increment i.
*/
final int inc(int i) {
- return (++i == items.length)? 0 : i;
+ return (++i == items.length) ? 0 : i;
+ }
+
+ /**
+ * Circularly decrement i.
+ */
+ final int dec(int i) {
+ return ((i == 0) ? items.length : i) - 1;
+ }
+
+ @SuppressWarnings("unchecked")
+ static <E> E cast(Object item) {
+ return (E) item;
+ }
+
+ /**
+ * Returns item at index i.
+ */
+ final E itemAt(int i) {
+ return this.<E>cast(items[i]);
+ }
+
+ /**
+ * Throws NullPointerException if argument is null.
+ *
+ * @param v the element
+ */
+ private static void checkNotNull(Object v) {
+ if (v == null)
+ throw new NullPointerException();
}
/**
@@ -129,8 +161,8 @@
* Call only when holding lock.
*/
private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
+ final Object[] items = this.items;
+ E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
@@ -139,11 +171,12 @@
}
/**
- * Utility for remove and iterator.remove: Delete item at position i.
+ * Deletes item at position i.
+ * Utility for remove and iterator.remove.
* Call only when holding lock.
*/
void removeAt(int i) {
- final E[] items = this.items;
+ final Object[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
@@ -167,69 +200,82 @@
}
/**
- * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
- * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+ * @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
- * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
- * @param fair if <tt>true</tt> then queue accesses for threads blocked
+ * @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
- * if <tt>false</tt> the access order is unspecified.
- * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+ * if {@code false} the access order is unspecified.
+ * @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
- this.items = (E[]) new Object[capacity];
+ this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
- * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
- * @param fair if <tt>true</tt> then queue accesses for threads blocked
+ * @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
- * if <tt>false</tt> the access order is unspecified.
+ * if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
- * @throws IllegalArgumentException if <tt>capacity</tt> is less than
- * <tt>c.size()</tt>, or less than 1.
+ * @throws IllegalArgumentException if {@code capacity} is less than
+ * {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
- if (capacity < c.size())
- throw new IllegalArgumentException();
- for (E e : c)
- add(e);
+ final ReentrantLock lock = this.lock;
+ lock.lock(); // Lock only for visibility, not mutual exclusion
+ try {
+ int i = 0;
+ try {
+ for (E e : c) {
+ checkNotNull(e);
+ items[i++] = e;
+ }
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ throw new IllegalArgumentException();
+ }
+ count = i;
+ putIndex = (i == capacity) ? 0 : i;
+ } finally {
+ lock.unlock();
+ }
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and throwing an
- * <tt>IllegalStateException</tt> if this queue is full.
+ * returning {@code true} upon success and throwing an
+ * {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
- * @return <tt>true</tt> (as specified by {@link Collection#add})
+ * @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
@@ -240,14 +286,14 @@
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
+ * returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
+ checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
@@ -270,18 +316,12 @@
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- final E[] items = this.items;
+ checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- try {
- while (count == items.length)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
+ while (count == items.length)
+ notFull.await();
insert(e);
} finally {
lock.unlock();
@@ -299,25 +339,18 @@
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (e == null) throw new NullPointerException();
+ checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (count != items.length) {
- insert(e);
- return true;
- }
+ while (count == items.length) {
if (nanos <= 0)
return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
+ nanos = notFull.awaitNanos(nanos);
}
+ insert(e);
+ return true;
} finally {
lock.unlock();
}
@@ -327,10 +360,7 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (count == 0)
- return null;
- E x = extract();
- return x;
+ return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
@@ -340,15 +370,9 @@
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- try {
- while (count == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- E x = extract();
- return x;
+ while (count == 0)
+ notEmpty.await();
+ return extract();
} finally {
lock.unlock();
}
@@ -359,21 +383,12 @@
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (count != 0) {
- E x = extract();
- return x;
- }
+ while (count == 0) {
if (nanos <= 0)
return null;
- try {
- nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
+ nanos = notEmpty.awaitNanos(nanos);
}
+ return extract();
} finally {
lock.unlock();
}
@@ -383,7 +398,7 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return (count == 0) ? null : items[takeIndex];
+ return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
@@ -412,10 +427,10 @@
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
+ * less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
@@ -431,59 +446,56 @@
/**
* Removes a single instance of the specified element from this queue,
- * if it is present. More formally, removes an element <tt>e</tt> such
- * that <tt>o.equals(e)</tt>, if this queue contains one or more such
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
* elements.
- * Returns <tt>true</tt> if this queue contained the specified element
+ * Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
+ * <p>Removal of interior elements in circular array based queues
+ * is an intrinsically slow and disruptive operation, so should
+ * be undertaken only in exceptional circumstances, ideally
+ * only when the queue is known not to be accessible by other
+ * threads.
+ *
* @param o element to be removed from this queue, if present
- * @return <tt>true</tt> if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = 0;
- for (;;) {
- if (k++ >= count)
- return false;
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
- i = inc(i);
}
-
+ return false;
} finally {
lock.unlock();
}
}
/**
- * Returns <tt>true</tt> if this queue contains the specified element.
- * More formally, returns <tt>true</tt> if and only if this queue contains
- * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
- * @return <tt>true</tt> if this queue contains the specified element
+ * @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = 0;
- while (k++ < count) {
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
if (o.equals(items[i]))
return true;
- i = inc(i);
- }
return false;
} finally {
lock.unlock();
@@ -504,17 +516,14 @@
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
+ final int count = this.count;
Object[] a = new Object[count];
- int k = 0;
- int i = takeIndex;
- while (k < count) {
- a[k++] = items[i];
- i = inc(i);
- }
+ for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
+ a[k] = items[i];
return a;
} finally {
lock.unlock();
@@ -531,22 +540,22 @@
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+ * <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
- * allocated array of <tt>String</tt>:
+ * allocated array of {@code String}:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -557,24 +566,20 @@
* this queue
* @throws NullPointerException if the specified array is null
*/
+ @SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (a.length < count)
+ final int count = this.count;
+ final int len = a.length;
+ if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
- a.getClass().getComponentType(),
- count
- );
-
- int k = 0;
- int i = takeIndex;
- while (k < count) {
- a[k++] = (T)items[i];
- i = inc(i);
- }
- if (a.length > count)
+ a.getClass().getComponentType(), count);
+ for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
+ a[k] = (T) items[i];
+ if (len > count)
a[count] = null;
return a;
} finally {
@@ -586,7 +591,19 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return super.toString();
+ int k = count;
+ if (k == 0)
+ return "[]";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (int i = takeIndex; ; i = inc(i)) {
+ Object e = items[i];
+ sb.append(e == this ? "(this Collection)" : e);
+ if (--k == 0)
+ return sb.append(']').toString();
+ sb.append(',').append(' ');
+ }
} finally {
lock.unlock();
}
@@ -597,16 +614,12 @@
* The queue will be empty after this call returns.
*/
public void clear() {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = count;
- while (k-- > 0) {
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
items[i] = null;
- i = inc(i);
- }
count = 0;
putIndex = 0;
takeIndex = 0;
@@ -623,11 +636,10 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
- if (c == null)
- throw new NullPointerException();
+ checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
@@ -635,7 +647,7 @@
int n = 0;
int max = count;
while (n < max) {
- c.add(items[i]);
+ c.add(this.<E>cast(items[i]));
items[i] = null;
i = inc(i);
++n;
@@ -659,22 +671,20 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
+ checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
- int sz = count;
- int max = (maxElements < count)? maxElements : count;
+ int max = (maxElements < count) ? maxElements : count;
while (n < max) {
- c.add(items[i]);
+ c.add(this.<E>cast(items[i]));
items[i] = null;
i = inc(i);
++n;
@@ -690,11 +700,13 @@
}
}
-
/**
* Returns an iterator over the elements in this queue in proper sequence.
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
- * will never throw {@link ConcurrentModificationException},
+ * The elements will be returned in order from first (head) to last (tail).
+ *
+ * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
@@ -702,83 +714,65 @@
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return new Itr();
- } finally {
- lock.unlock();
- }
+ return new Itr();
}
/**
- * Iterator for ArrayBlockingQueue
+ * Iterator for ArrayBlockingQueue. To maintain weak consistency
+ * with respect to puts and takes, we (1) read ahead one slot, so
+ * as to not report hasNext true but then not have an element to
+ * return -- however we later recheck this slot to use the most
+ * current value; (2) ensure that each array slot is traversed at
+ * most once (by tracking "remaining" elements); (3) skip over
+ * null slots, which can occur if takes race ahead of iterators.
+ * However, for circular array-based queues, we cannot rely on any
+ * well established definition of what it means to be weakly
+ * consistent with respect to interior removes since these may
+ * require slot overwrites in the process of sliding elements to
+ * cover gaps. So we settle for resiliency, operating on
+ * established apparent nexts, which may miss some elements that
+ * have moved between calls to next.
*/
private class Itr implements Iterator<E> {
- /**
- * Index of element to be returned by next,
- * or a negative number if no such.
- */
- private int nextIndex;
-
- /**
- * nextItem holds on to item fields because once we claim
- * that an element exists in hasNext(), we must return it in
- * the following next() call even if it was in the process of
- * being removed when hasNext() was called.
- */
- private E nextItem;
-
- /**
- * Index of element returned by most recent call to next.
- * Reset to -1 if this element is deleted by a call to remove.
- */
- private int lastRet;
+ private int remaining; // Number of elements yet to be returned
+ private int nextIndex; // Index of element to be returned by next
+ private E nextItem; // Element to be returned by next call to next
+ private E lastItem; // Element returned by last call to next
+ private int lastRet; // Index of last element returned, or -1 if none
Itr() {
- lastRet = -1;
- if (count == 0)
- nextIndex = -1;
- else {
- nextIndex = takeIndex;
- nextItem = items[takeIndex];
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ lastRet = -1;
+ if ((remaining = count) > 0)
+ nextItem = itemAt(nextIndex = takeIndex);
+ } finally {
+ lock.unlock();
}
}
public boolean hasNext() {
- /*
- * No sync. We can return true by mistake here
- * only if this iterator passed across threads,
- * which we don't support anyway.
- */
- return nextIndex >= 0;
- }
-
- /**
- * Checks whether nextIndex is valid; if so setting nextItem.
- * Stops iterator when either hits putIndex or sees null item.
- */
- private void checkNext() {
- if (nextIndex == putIndex) {
- nextIndex = -1;
- nextItem = null;
- } else {
- nextItem = items[nextIndex];
- if (nextItem == null)
- nextIndex = -1;
- }
+ return remaining > 0;
}
public E next() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
- if (nextIndex < 0)
+ if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
- E x = nextItem;
- nextIndex = inc(nextIndex);
- checkNext();
+ E x = itemAt(nextIndex); // check for fresher value
+ if (x == null) {
+ x = nextItem; // we are forced to report old value
+ lastItem = null; // but ensure remove fails
+ }
+ else
+ lastItem = x;
+ while (--remaining > 0 && // skip over nulls
+ (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
+ ;
return x;
} finally {
lock.unlock();
@@ -793,15 +787,19 @@
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
-
- int ti = takeIndex;
- removeAt(i);
- // back up cursor (reset to front if was first element)
- nextIndex = (i == ti) ? takeIndex : i;
- checkNext();
+ E x = lastItem;
+ lastItem = null;
+ // only remove if item still at index
+ if (x != null && x == items[i]) {
+ boolean removingHead = (i == takeIndex);
+ removeAt(i);
+ if (!removingHead)
+ nextIndex = dec(nextIndex);
+ }
} finally {
lock.unlock();
}
}
}
+
}
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java Wed Jan 12 14:40:36 2011 +0000
@@ -869,6 +869,8 @@
/**
* Inserts the specified element at the front of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException}.
*
* @throws NullPointerException if the specified element is null
*/
@@ -878,6 +880,8 @@
/**
* Inserts the specified element at the end of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException}.
*
* <p>This method is equivalent to {@link #add}.
*
@@ -889,8 +893,9 @@
/**
* Inserts the specified element at the front of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
- * @return {@code true} always
+ * @return {@code true} (as specified by {@link Deque#offerFirst})
* @throws NullPointerException if the specified element is null
*/
public boolean offerFirst(E e) {
@@ -900,10 +905,11 @@
/**
* Inserts the specified element at the end of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
* <p>This method is equivalent to {@link #add}.
*
- * @return {@code true} always
+ * @return {@code true} (as specified by {@link Deque#offerLast})
* @throws NullPointerException if the specified element is null
*/
public boolean offerLast(E e) {
@@ -983,6 +989,7 @@
/**
* Inserts the specified element at the tail of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
@@ -993,6 +1000,8 @@
/**
* Inserts the specified element at the tail of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -269,6 +269,8 @@
/**
* Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
@@ -298,6 +300,7 @@
/**
* Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java Wed Jan 12 14:40:36 2011 +0000
@@ -374,17 +374,11 @@
null, null, 1);
}
- /** Updater for casHead */
- private static final
- AtomicReferenceFieldUpdater<ConcurrentSkipListMap, HeadIndex>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (ConcurrentSkipListMap.class, HeadIndex.class, "head");
-
/**
* compareAndSet head node
*/
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
- return headUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
/* ---------------- Nodes -------------- */
@@ -423,28 +417,18 @@
this.next = next;
}
- /** Updater for casNext */
- static final AtomicReferenceFieldUpdater<Node, Node>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Node.class, Node.class, "next");
-
- /** Updater for casValue */
- static final AtomicReferenceFieldUpdater<Node, Object>
- valueUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Node.class, Object.class, "value");
-
/**
* compareAndSet value field
*/
boolean casValue(Object cmp, Object val) {
- return valueUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
/**
* compareAndSet next field
*/
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
- return nextUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
@@ -522,6 +506,14 @@
return null;
return new AbstractMap.SimpleImmutableEntry<K,V>(key, v);
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long valueOffset =
+ objectFieldOffset(UNSAFE, "value", Node.class);
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", Node.class);
+
}
/* ---------------- Indexing -------------- */
@@ -547,16 +539,11 @@
this.right = right;
}
- /** Updater for casRight */
- static final AtomicReferenceFieldUpdater<Index, Index>
- rightUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Index.class, Index.class, "right");
-
/**
* compareAndSet right field
*/
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
- return rightUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
/**
@@ -591,6 +578,12 @@
final boolean unlink(Index<K,V> succ) {
return !indexesDeletedNode() && casRight(succ, succ.right);
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long rightOffset =
+ objectFieldOffset(UNSAFE, "right", Index.class);
+
}
/* ---------------- Head nodes -------------- */
@@ -640,7 +633,8 @@
* cast key as Comparable, which may cause ClassCastException,
* which is propagated back to caller.
*/
- private Comparable<? super K> comparable(Object key) throws ClassCastException {
+ private Comparable<? super K> comparable(Object key)
+ throws ClassCastException {
if (key == null)
throw new NullPointerException();
if (comparator != null)
@@ -799,68 +793,12 @@
}
/**
- * Specialized variant of findNode to perform Map.get. Does a weak
- * traversal, not bothering to fix any deleted index nodes,
- * returning early if it happens to see key in index, and passing
- * over any deleted base nodes, falling back to getUsingFindNode
- * only if it would otherwise return value from an ongoing
- * deletion. Also uses "bound" to eliminate need for some
- * comparisons (see Pugh Cookbook). Also folds uses of null checks
- * and node-skipping because markers have null keys.
+ * Gets value for key using findNode.
* @param okey the key
* @return the value, or null if absent
*/
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
- Node<K,V> bound = null;
- Index<K,V> q = head;
- Index<K,V> r = q.right;
- Node<K,V> n;
- K k;
- int c;
- for (;;) {
- Index<K,V> d;
- // Traverse rights
- if (r != null && (n = r.node) != bound && (k = n.key) != null) {
- if ((c = key.compareTo(k)) > 0) {
- q = r;
- r = r.right;
- continue;
- } else if (c == 0) {
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else
- bound = n;
- }
-
- // Traverse down
- if ((d = q.down) != null) {
- q = d;
- r = d.right;
- } else
- break;
- }
-
- // Traverse nexts
- for (n = q.node.next; n != null; n = n.next) {
- if ((k = n.key) != null) {
- if ((c = key.compareTo(k)) == 0) {
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else if (c < 0)
- break;
- }
- }
- return null;
- }
-
- /**
- * Performs map.get via findNode. Used as a backup if doGet
- * encounters an in-progress deletion.
- * @param key the key
- * @return the value, or null if absent
- */
- private V getUsingFindNode(Comparable<? super K> key) {
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
@@ -943,7 +881,7 @@
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
- if ((x & 0x8001) != 0) // test highest and lowest bits
+ if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
@@ -1256,7 +1194,7 @@
Node<K,V> n = b.next;
for (;;) {
if (n == null)
- return (b.isBaseHeader())? null : b;
+ return b.isBaseHeader() ? null : b;
Node<K,V> f = n.next; // inconsistent read
if (n != b.next)
break;
@@ -1374,7 +1312,7 @@
Node<K,V> n = b.next;
for (;;) {
if (n == null)
- return ((rel & LT) == 0 || b.isBaseHeader())? null : b;
+ return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
@@ -1390,7 +1328,7 @@
(c < 0 && (rel & LT) == 0))
return n;
if ( c <= 0 && (rel & LT) != 0)
- return (b.isBaseHeader())? null : b;
+ return b.isBaseHeader() ? null : b;
b = n;
n = f;
}
@@ -1744,7 +1682,7 @@
if (n.getValidValue() != null)
++count;
}
- return (count >= Integer.MAX_VALUE)? Integer.MAX_VALUE : (int)count;
+ return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
/**
@@ -2099,7 +2037,7 @@
*/
public K lowerKey(K key) {
Node<K,V> n = findNear(key, LT);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2123,7 +2061,7 @@
*/
public K floorKey(K key) {
Node<K,V> n = findNear(key, LT|EQ);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2145,7 +2083,7 @@
*/
public K ceilingKey(K key) {
Node<K,V> n = findNear(key, GT|EQ);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2169,7 +2107,7 @@
*/
public K higherKey(K key) {
Node<K,V> n = findNear(key, GT);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2342,7 +2280,8 @@
return list;
}
- static final class KeySet<E> extends AbstractSet<E> implements NavigableSet<E> {
+ static final class KeySet<E>
+ extends AbstractSet<E> implements NavigableSet<E> {
private final ConcurrentNavigableMap<E,Object> m;
KeySet(ConcurrentNavigableMap<E,Object> map) { m = map; }
public int size() { return m.size(); }
@@ -2359,11 +2298,11 @@
public E last() { return m.lastKey(); }
public E pollFirst() {
Map.Entry<E,Object> e = m.pollFirstEntry();
- return e == null? null : e.getKey();
+ return (e == null) ? null : e.getKey();
}
public E pollLast() {
Map.Entry<E,Object> e = m.pollLastEntry();
- return e == null? null : e.getKey();
+ return (e == null) ? null : e.getKey();
}
public Iterator<E> iterator() {
if (m instanceof ConcurrentSkipListMap)
@@ -2710,9 +2649,9 @@
rel &= ~m.LT;
}
if (tooLow(key))
- return ((rel & m.LT) != 0)? null : lowestEntry();
+ return ((rel & m.LT) != 0) ? null : lowestEntry();
if (tooHigh(key))
- return ((rel & m.LT) != 0)? highestEntry() : null;
+ return ((rel & m.LT) != 0) ? highestEntry() : null;
for (;;) {
Node<K,V> n = m.findNear(key, rel);
if (n == null || !inBounds(n.key))
@@ -2783,7 +2722,7 @@
public V remove(Object key) {
K k = (K)key;
- return (!inBounds(k))? null : m.remove(k);
+ return (!inBounds(k)) ? null : m.remove(k);
}
public int size() {
@@ -2794,7 +2733,7 @@
if (n.getValidValue() != null)
++count;
}
- return count >= Integer.MAX_VALUE? Integer.MAX_VALUE : (int)count;
+ return count >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)count;
}
public boolean isEmpty() {
@@ -2972,27 +2911,27 @@
}
public K firstKey() {
- return isDescending? highestKey() : lowestKey();
+ return isDescending ? highestKey() : lowestKey();
}
public K lastKey() {
- return isDescending? lowestKey() : highestKey();
+ return isDescending ? lowestKey() : highestKey();
}
public Map.Entry<K,V> firstEntry() {
- return isDescending? highestEntry() : lowestEntry();
+ return isDescending ? highestEntry() : lowestEntry();
}
public Map.Entry<K,V> lastEntry() {
- return isDescending? lowestEntry() : highestEntry();
+ return isDescending ? lowestEntry() : highestEntry();
}
public Map.Entry<K,V> pollFirstEntry() {
- return isDescending? removeHighest() : removeLowest();
+ return isDescending ? removeHighest() : removeLowest();
}
public Map.Entry<K,V> pollLastEntry() {
- return isDescending? removeLowest() : removeHighest();
+ return isDescending ? removeLowest() : removeHighest();
}
/* ---------------- Submap Views -------------- */
@@ -3141,4 +3080,22 @@
}
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", ConcurrentSkipListMap.class);
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}
--- a/jdk/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java Wed Jan 12 14:40:36 2011 +0000
@@ -832,7 +832,7 @@
}
/**
- * Save the state of the list to a stream (i.e., serialize it).
+ * Saves the state of the list to a stream (that is, serializes it).
*
* @serialData The length of the array backing the list is emitted
* (int), followed by all of its elements (each an Object)
@@ -842,27 +842,25 @@
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{
- // Write out element count, and any hidden stuff
s.defaultWriteObject();
Object[] elements = getArray();
- int len = elements.length;
// Write out array length
- s.writeInt(len);
+ s.writeInt(elements.length);
// Write out all elements in the proper order.
- for (int i = 0; i < len; i++)
- s.writeObject(elements[i]);
+ for (Object element : elements)
+ s.writeObject(element);
}
/**
- * Reconstitute the list from a stream (i.e., deserialize it).
+ * Reconstitutes the list from a stream (that is, deserializes it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
- // Read in size, and any hidden stuff
s.defaultReadObject();
// bind to new lock
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Wed Jan 12 14:40:36 2011 +0000
@@ -525,8 +525,8 @@
*/
private volatile long eventWaiters;
- private static final int EVENT_COUNT_SHIFT = 32;
- private static final long WAITER_ID_MASK = (1L << 16) - 1L;
+ private static final int EVENT_COUNT_SHIFT = 32;
+ private static final int WAITER_ID_MASK = (1 << 16) - 1;
/**
* A counter for events that may wake up worker threads:
@@ -615,7 +615,7 @@
// are usually manually inlined by callers
/**
- * Increments running count part of workerCounts
+ * Increments running count part of workerCounts.
*/
final void incrementRunningCount() {
int c;
@@ -625,7 +625,17 @@
}
/**
- * Tries to decrement running count unless already zero
+ * Tries to increment running count part of workerCounts.
+ */
+ final boolean tryIncrementRunningCount() {
+ int c;
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ c = workerCounts,
+ c + ONE_RUNNING);
+ }
+
+ /**
+ * Tries to decrement running count unless already zero.
*/
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
@@ -698,10 +708,11 @@
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
- ws = Arrays.copyOf(ws, n << 1);
+ ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
- workers = ws; // volatile array write ensures slot visibility
+ int c = eventCount; // advance event count to ensure visibility
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally {
lock.unlock();
}
@@ -734,7 +745,7 @@
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
- decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
@@ -746,24 +757,23 @@
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
- * upon releasing two workers, letting others take over.
+ * upon releasing four workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
- boolean releasedOne = false;
+ int releases = 4;
ForkJoinWorkerThread w; int id;
- while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
- if (releasedOne) // exit on second release
+ if (--releases == 0)
break;
- releasedOne = true;
}
if (eventCount != ec)
break;
@@ -793,7 +803,7 @@
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
- (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+ (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
@@ -820,9 +830,9 @@
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
- long startTime = untimed? 0 : System.nanoTime();
+ long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.isTerminating())
+ if (w.isTerminating() || eventCount != ec)
break; // recheck after clear
if (untimed)
LockSupport.park(w);
@@ -860,7 +870,8 @@
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
- (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+ (runState >= TERMINATING ||
+ (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
@@ -914,12 +925,8 @@
break;
}
w.start(recordWorker(w), ueh);
- if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
- int c; // advance event count
- UNSAFE.compareAndSwapInt(this, eventCountOffset,
- c = eventCount, c+1);
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
break; // add at most one unless total below target
- }
}
}
if (eventWaiters != 0L)
@@ -955,7 +962,7 @@
}
else if ((h = eventWaiters) != 0L) {
long nh;
- int id = ((int)(h & WAITER_ID_MASK)) - 1;
+ int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
@@ -1003,24 +1010,31 @@
int pc = parallelism;
while (w.runState == 0) {
int rs = runState;
- if (rs >= TERMINATING) { // propagate shutdown
+ if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
- UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+ UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false;
- int wc = workerCounts;
+ if (rs == SHUTDOWN) { // all inactive and shut down
+ tryTerminate(false);
+ continue;
+ }
+ }
+ int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
- workerCounts == wc && // try to suspend as spare
+ workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
- else if (!ran) {
+ else if (ran)
+ break;
+ else {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
@@ -1032,8 +1046,6 @@
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
- else
- break;
}
}
@@ -1043,35 +1055,67 @@
*
* @param joinMe the task to join
* @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
*/
- final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
+ final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
+ boolean timed, long nanos) {
+ long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+ boolean running = true; // false when count decremented
while (joinMe.status >= 0) {
- int wc;
- worker.helpJoinTask(joinMe);
+ if (runState >= TERMINATING) {
+ joinMe.cancelIgnoringExceptions();
+ break;
+ }
+ running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0)
break;
- else if (retries > 0)
+ if (retries > 0) {
--retries;
- else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
- int stat, c; long h;
- while ((stat = joinMe.status) >= 0 &&
- (h = eventWaiters) != 0L && // help release others
- (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ continue;
+ }
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) != 0) {
+ if (running) {
+ if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING))
+ continue;
+ running = false;
+ }
+ long h = eventWaiters;
+ if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
- if (stat >= 0 &&
- ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
- (stat =
- joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
- helpMaintainParallelism(); // timeout or no running workers
- do {} while (!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- if (stat < 0)
- break; // else restart
+ if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+ long ms; int ns;
+ if (!timed) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else { // at most JOIN_TIMEOUT_MILLIS per wait
+ long nt = nanos - (System.nanoTime() - startTime);
+ if (nt <= 0L)
+ break;
+ ms = nt / 1000000;
+ if (ms > JOIN_TIMEOUT_MILLIS) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else
+ ns = (int) (nt % 1000000);
+ }
+ joinMe.internalAwaitDone(ms, ns);
+ }
+ if (joinMe.status < 0)
+ break;
}
+ helpMaintainParallelism();
+ }
+ if (!running) {
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
}
}
@@ -1082,9 +1126,10 @@
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
- if ((wc & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
+ if ((wc & RUNNING_COUNT_MASK) == 0)
+ helpMaintainParallelism();
+ else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
@@ -1129,12 +1174,11 @@
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
- termination.arrive();
+ termination.forceTermination();
}
return true;
}
-
/**
* Actions on transition to TERMINATING
*
@@ -1325,17 +1369,13 @@
// Execution methods
/**
- * Common code for execute, invoke and submit
+ * Submits task and creates, starts, or resumes some workers if necessary
*/
private <T> void doSubmit(ForkJoinTask<T> task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
- helpMaintainParallelism(); // create, start, or resume some workers
+ helpMaintainParallelism();
}
/**
@@ -1348,8 +1388,33 @@
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
- doSubmit(task);
- return task.join();
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
+ doSubmit(task);
+ return task.join();
+ }
+ }
+
+ /**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private <T> void forkOrSubmit(ForkJoinTask<T> task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
}
/**
@@ -1361,7 +1426,9 @@
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
}
// AbstractExecutorService methods
@@ -1372,12 +1439,14 @@
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
}
/**
@@ -1390,7 +1459,9 @@
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
return task;
}
@@ -1400,8 +1471,10 @@
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1411,8 +1484,10 @@
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1422,12 +1497,14 @@
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1725,8 +1802,11 @@
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
- * ignored or suppressed interruption, causing this executor not
- * to properly terminate.
+ * ignored or suppressed interruption, or are waiting for IO,
+ * causing this executor not to properly terminate. (See the
+ * advisory notes for class {@link ForkJoinTask} stating that
+ * tasks should not normally entail blocking operations. But if
+ * they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
@@ -1764,10 +1844,11 @@
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
- return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
+ termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) {
return false;
}
+ return true;
}
/**
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Wed Jan 12 14:40:36 2011 +0000
@@ -42,6 +42,16 @@
import java.util.RandomAccess;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@@ -129,6 +139,16 @@
* result in exceptions or errors, possibly including
* {@code ClassCastException}.
*
+ * <p>Method {@link #join} and its variants are appropriate for use
+ * only when completion dependencies are acyclic; that is, the
+ * parallel computation can be described as a directed acyclic graph
+ * (DAG). Otherwise, executions may encounter a form of deadlock as
+ * tasks cyclically wait for each other. However, this framework
+ * supports other methods and techniques (for example the use of
+ * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
+ * may be of use in constructing custom subclasses for problems that
+ * are not statically structured as DAGs.
+ *
* <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
* underlying lightweight task scheduling framework. Developers
@@ -143,9 +163,10 @@
* computation. Large tasks should be split into smaller subtasks,
* usually via recursive decomposition. As a very rough rule of thumb,
* a task should perform more than 100 and less than 10000 basic
- * computational steps. If tasks are too big, then parallelism cannot
- * improve throughput. If too small, then memory and internal task
- * maintenance overhead may overwhelm processing.
+ * computational steps, and should avoid indefinite looping. If tasks
+ * are too big, then parallelism cannot improve throughput. If too
+ * small, then memory and internal task maintenance overhead may
+ * overwhelm processing.
*
* <p>This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
@@ -242,17 +263,20 @@
}
/**
- * Blocks a worker thread until completion. Called only by
- * pool. Currently unused -- pool-based waits use timeout
- * version below.
+ * Blocks a worker thread until completed or timed out. Called
+ * only by pool.
*/
- final void internalAwaitDone() {
- int s; // the odd construction reduces lock bias effects
- while ((s = status) >= 0) {
- try {
+ final void internalAwaitDone(long millis, int nanos) {
+ int s = status;
+ if ((s == 0 &&
+ UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
+ s > 0) {
+ try { // the odd construction reduces lock bias effects
synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait();
+ if (status > 0)
+ wait(millis, nanos);
+ else
+ notifyAll();
}
} catch (InterruptedException ie) {
cancelIfTerminating();
@@ -261,46 +285,61 @@
}
/**
- * Blocks a worker thread until completed or timed out. Called
- * only by pool.
- *
- * @return status on exit
- */
- final int internalAwaitDone(long millis) {
- int s;
- if ((s = status) >= 0) {
- try {
- synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait(millis, 0);
- }
- } catch (InterruptedException ie) {
- cancelIfTerminating();
- }
- s = status;
- }
- return s;
- }
-
- /**
* Blocks a non-worker-thread until completion.
*/
private void externalAwaitDone() {
- int s;
- while ((s = status) >= 0) {
+ if (status >= 0) {
+ boolean interrupted = false;
synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
- boolean interrupted = false;
- while (status >= 0) {
+ for (;;) {
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
- break;
+ }
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Blocks a non-worker-thread until completion or interruption or timeout.
+ */
+ private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+ throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (status >= 0) {
+ long startTime = timed ? System.nanoTime() : 0L;
+ synchronized (this) {
+ for (;;) {
+ long nt;
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else if (!timed)
+ wait();
+ else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
+ wait(nt / 1000000, (int)(nt % 1000000));
+ else
+ break;
}
}
}
@@ -335,7 +374,7 @@
* #isDone} returning {@code true}.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -349,10 +388,13 @@
}
/**
- * Returns the result of the computation when it {@link #isDone is done}.
- * This method differs from {@link #get()} in that
+ * Returns the result of the computation when it {@link #isDone is
+ * done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
- * {@code Error}, not {@code ExecutionException}.
+ * {@code Error}, not {@code ExecutionException}, and that
+ * interrupts of the calling thread do <em>not</em> cause the
+ * method to abruptly return by throwing {@code
+ * InterruptedException}.
*
* @return the computed result
*/
@@ -394,7 +436,7 @@
* unprocessed.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -422,7 +464,7 @@
* normally or exceptionally, or left unprocessed.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -477,7 +519,7 @@
* unprocessed.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -529,25 +571,28 @@
/**
* Attempts to cancel execution of this task. This attempt will
- * fail if the task has already completed, has already been
- * cancelled, or could not be cancelled for some other reason. If
- * successful, and this task has not started when cancel is
- * called, execution of this task is suppressed, {@link
- * #isCancelled} will report true, and {@link #join} will result
- * in a {@code CancellationException} being thrown.
+ * fail if the task has already completed or could not be
+ * cancelled for some other reason. If successful, and this task
+ * has not started when {@code cancel} is called, execution of
+ * this task is suppressed. After this method returns
+ * successfully, unless there is an intervening call to {@link
+ * #reinitialize}, subsequent calls to {@link #isCancelled},
+ * {@link #isDone}, and {@code cancel} will return {@code true}
+ * and calls to {@link #join} and related methods will result in
+ * {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
- * still ensure that these minimal properties hold. In particular,
- * the {@code cancel} method itself must not throw exceptions.
+ * still ensure that these properties hold. In particular, the
+ * {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
- * @param mayInterruptIfRunning this value is ignored in the
- * default implementation because tasks are not
- * cancelled via interruption
+ * @param mayInterruptIfRunning this value has no effect in the
+ * default implementation because interrupts are not used to
+ * control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
@@ -681,23 +726,13 @@
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s;
- if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread)
quietlyJoin();
- s = status;
- }
- else {
- while ((s = status) >= 0) {
- synchronized (this) { // interruptible form of awaitDone
- if (UNSAFE.compareAndSwapInt(this, statusOffset,
- s, SIGNAL)) {
- while (status >= 0)
- wait();
- }
- }
- }
- }
- if (s < NORMAL) {
+ else
+ externalInterruptibleAwaitDone(false, 0L);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -723,72 +758,18 @@
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ long nanos = unit.toNanos(timeout);
Thread t = Thread.currentThread();
- ForkJoinPool pool;
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- if (status >= 0 && w.unpushTask(this))
- quietlyExec();
- pool = w.pool;
- }
+ if (t instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
else
- pool = null;
- /*
- * Timed wait loop intermixes cases for FJ (pool != null) and
- * non FJ threads. For FJ, decrement pool count but don't try
- * for replacement; increment count on completion. For non-FJ,
- * deal with interrupts. This is messy, but a little less so
- * than is splitting the FJ and nonFJ cases.
- */
- boolean interrupted = false;
- boolean dec = false; // true if pool count decremented
- long nanos = unit.toNanos(timeout);
- for (;;) {
- if (pool == null && Thread.interrupted()) {
- interrupted = true;
- break;
- }
- int s = status;
- if (s < 0)
- break;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
- long startTime = System.nanoTime();
- long nt; // wait time
- while (status >= 0 &&
- (nt = nanos - (System.nanoTime() - startTime)) > 0) {
- if (pool != null && !dec)
- dec = pool.tryDecrementRunningCount();
- else {
- long ms = nt / 1000000;
- int ns = (int) (nt % 1000000);
- try {
- synchronized (this) {
- if (status >= 0)
- wait(ms, ns);
- }
- } catch (InterruptedException ie) {
- if (pool != null)
- cancelIfTerminating();
- else {
- interrupted = true;
- break;
- }
- }
- }
- }
- break;
- }
- }
- if (pool != null && dec)
- pool.incrementRunningCount();
- if (interrupted)
- throw new InterruptedException();
- int es = status;
- if (es != NORMAL) {
+ externalInterruptibleAwaitDone(true, nanos);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
- if (es == CANCELLED)
+ if (s == CANCELLED)
throw new CancellationException();
- if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}
@@ -819,7 +800,7 @@
return;
}
}
- w.joinTask(this);
+ w.joinTask(this, false, 0L);
}
}
else
@@ -855,7 +836,7 @@
* processed.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -874,6 +855,12 @@
* under any other usage conditions are not guaranteed.
* This method may be useful when executing
* pre-constructed trees of subtasks in loops.
+ *
+ * <p>Upon completion of this method, {@code isDone()} reports
+ * {@code false}, and {@code getException()} reports {@code
+ * null}. However, the value returned by {@code getRawResult} is
+ * unaffected. To clear this value, you can invoke {@code
+ * setRawResult(null)}.
*/
public void reinitialize() {
if (status == EXCEPTIONAL)
@@ -895,11 +882,12 @@
}
/**
- * Returns {@code true} if the current thread is executing as a
- * ForkJoinPool computation.
+ * Returns {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
*
- * @return {@code true} if the current thread is executing as a
- * ForkJoinPool computation, or false otherwise
+ * @return {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
+ * or {@code false} otherwise
*/
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
@@ -914,7 +902,7 @@
* were not, stolen.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -933,7 +921,7 @@
* fork other tasks.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -956,7 +944,7 @@
* exceeded.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1014,7 +1002,7 @@
* otherwise.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1033,7 +1021,7 @@
* be useful otherwise.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1056,7 +1044,7 @@
* otherwise.
*
* <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Wed Jan 12 14:40:36 2011 +0000
@@ -38,16 +38,18 @@
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.RejectedExecutionException;
/**
- * A thread managed by a {@link ForkJoinPool}. This class is
- * subclassable solely for the sake of adding functionality -- there
- * are no overridable methods dealing with scheduling or execution.
- * However, you can override initialization and termination methods
- * surrounding the main task processing loop. If you do create such a
- * subclass, you will also need to supply a custom {@link
- * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
- * ForkJoinPool}.
+ * A thread managed by a {@link ForkJoinPool}, which executes
+ * {@link ForkJoinTask}s.
+ * This class is subclassable solely for the sake of adding
+ * functionality -- there are no overridable methods dealing with
+ * scheduling or execution. However, you can override initialization
+ * and termination methods surrounding the main task processing loop.
+ * If you do create such a subclass, you will also need to supply a
+ * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
+ * in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
@@ -376,7 +378,7 @@
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
- * invoke @code{super.onStart()} at the beginning of the method.
+ * invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
@@ -384,7 +386,7 @@
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
- seed = rs == 0? 1 : rs; // seed must be nonzero
+ seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
@@ -426,7 +428,7 @@
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
- * ForkJoinTasks.
+ * {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
@@ -628,6 +630,19 @@
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * Note: here and in related methods, as a
+ * performance (not correctness) issue, we'd like
+ * to encourage compiler not to arbitrarily
+ * postpone setting sp after successful CAS.
+ * Currently there is no intrinsic for arranging
+ * this, but using Unsafe putOrderedInt may be a
+ * preferable strategy on some compilers even
+ * though its main effect is a pre-, not post-
+ * fence. To simplify possible changes, the option
+ * is left in comments next to the associated
+ * assignments.
+ */
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
return t;
@@ -777,10 +792,10 @@
// Run State management
// status check methods used mainly by ForkJoinPool
- final boolean isRunning() { return runState == 0; }
- final boolean isTerminated() { return (runState & TERMINATED) != 0; }
- final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
- final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
+ final boolean isRunning() { return runState == 0; }
+ final boolean isTerminated() { return (runState & TERMINATED) != 0; }
+ final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
+ final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isTerminating() {
if ((runState & TERMINATING) != 0)
@@ -884,8 +899,7 @@
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
- if (cj != null) {
- currentJoin = null;
+ if (cj != null && cj.status >= 0) {
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
@@ -893,10 +907,8 @@
}
}
ForkJoinTask<?> cs = currentSteal;
- if (cs != null) {
- currentSteal = null;
+ if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions();
- }
while (base != sp) {
ForkJoinTask<?> t = deqTask();
if (t != null)
@@ -959,57 +971,23 @@
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
+ * @param timed true if use timed wait
+ * @param nanos wait time if timed
*/
- final void joinTask(ForkJoinTask<?> joinMe) {
+ final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
- if (sp != base)
- localHelpJoinTask(joinMe);
- if (joinMe.status >= 0)
- pool.awaitJoin(joinMe, this);
+ pool.awaitJoin(joinMe, this, timed, nanos);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
- * Run tasks in local queue until given task is done.
- *
- * @param joinMe the task to join
- */
- private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
- int s;
- ForkJoinTask<?>[] q;
- while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
- int i = (q.length - 1) & --s;
- long u = (i << qShift) + qBase; // raw offset
- ForkJoinTask<?> t = q[i];
- if (t == null) // lost to a stealer
- break;
- if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
- /*
- * This recheck (and similarly in helpJoinTask)
- * handles cases where joinMe is independently
- * cancelled or forced even though there is other work
- * available. Back out of the pop by putting t back
- * into slot before we commit by writing sp.
- */
- if (joinMe.status < 0) {
- UNSAFE.putObjectVolatile(q, u, t);
- break;
- }
- sp = s;
- // UNSAFE.putOrderedInt(this, spOffset, s);
- t.quietlyExec();
- }
- }
- }
-
- /**
- * Unless terminating, tries to locate and help perform tasks for
- * a stealer of the given task, or in turn one of its stealers.
- * Traces currentSteal->currentJoin links looking for a thread
- * working on a descendant of the given task and with a non-empty
- * queue to steal back and execute tasks from.
+ * Tries to locate and help perform tasks for a stealer of the
+ * given task, or in turn one of its stealers. Traces
+ * currentSteal->currentJoin links looking for a thread working on
+ * a descendant of the given task and with a non-empty queue to
+ * steal back and execute tasks from.
*
* The implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
@@ -1019,77 +997,127 @@
* don't work out.
*
* @param joinMe the task to join
+ * @param running if false, then must update pool count upon
+ * running a task
+ * @return value of running on exit
*/
- final void helpJoinTask(ForkJoinTask<?> joinMe) {
- ForkJoinWorkerThread[] ws;
- int n;
- if (joinMe.status < 0) // already done
- return;
- if ((runState & TERMINATING) != 0) { // cancel if shutting down
- joinMe.cancelIgnoringExceptions();
- return;
- }
- if ((ws = pool.workers) == null || (n = ws.length) <= 1)
- return; // need at least 2 workers
-
- ForkJoinTask<?> task = joinMe; // base of chain
- ForkJoinWorkerThread thread = this; // thread with stolen task
- for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
- // Try to find v, the stealer of task, by first using hint
- ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
- if (v == null || v.currentSteal != task) {
- for (int j = 0; ; ++j) { // search array
- if (j < n) {
- ForkJoinTask<?> vs;
- if ((v = ws[j]) != null &&
- (vs = v.currentSteal) != null) {
- if (joinMe.status < 0 || task.status < 0)
- return; // stale or done
- if (vs == task) {
- thread.stealHint = j;
- break; // save hint for next time
- }
- }
- }
- else
- return; // no stealer
+ final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
+ /*
+ * Initial checks to (1) abort if terminating; (2) clean out
+ * old cancelled tasks from local queue; (3) if joinMe is next
+ * task, run it; (4) omit scan if local queue nonempty (since
+ * it may contain non-descendents of joinMe).
+ */
+ ForkJoinPool p = pool;
+ for (;;) {
+ ForkJoinTask<?>[] q;
+ int s;
+ if (joinMe.status < 0)
+ return running;
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ return running;
+ }
+ else if ((s = sp) == base || (q = queue) == null)
+ break; // queue empty
+ else {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
+ ForkJoinTask<?> t = q[i];
+ if (t == null)
+ break; // lost to a stealer
+ else if (t != joinMe && t.status >= 0)
+ return running; // cannot safely help
+ else if ((running ||
+ (running = p.tryIncrementRunningCount())) &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ t.quietlyExec();
}
}
- for (;;) { // Try to help v, using specialized form of deqTask
- if (joinMe.status < 0)
- return;
- int b = v.base;
- ForkJoinTask<?>[] q = v.queue;
- if (b == v.sp || q == null)
- break;
- int i = (q.length - 1) & b;
- long u = (i << qShift) + qBase;
- ForkJoinTask<?> t = q[i];
- int pid = poolIndex;
- ForkJoinTask<?> ps = currentSteal;
- if (task.status < 0)
- return; // stale or done
- if (t != null && v.base == b++ &&
- UNSAFE.compareAndSwapObject(q, u, t, null)) {
- if (joinMe.status < 0) {
- UNSAFE.putObjectVolatile(q, u, t);
- return; // back out on cancel
+ }
+
+ int n; // worker array size
+ ForkJoinWorkerThread[] ws = p.workers;
+ if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
+ ForkJoinTask<?> task = joinMe; // base of chain
+ ForkJoinWorkerThread thread = this; // thread with stolen task
+
+ outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+ // Try to find v, the stealer of task, by first using hint
+ ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
+ if (v == null || v.currentSteal != task) {
+ for (int j = 0; ; ++j) { // search array
+ if (j < n) {
+ ForkJoinTask<?> vs;
+ if ((v = ws[j]) != null &&
+ (vs = v.currentSteal) != null) {
+ if (joinMe.status < 0)
+ break outer;
+ if (vs == task) {
+ if (task.status < 0)
+ break outer; // stale
+ thread.stealHint = j;
+ break; // save hint for next time
+ }
+ }
+ }
+ else
+ break outer; // no stealer
}
- v.base = b;
- v.stealHint = pid;
- UNSAFE.putOrderedObject(this, currentStealOffset, t);
- t.quietlyExec();
- UNSAFE.putOrderedObject(this, currentStealOffset, ps);
}
+
+ // Try to help v, using specialized form of deqTask
+ for (;;) {
+ if (joinMe.status < 0)
+ break outer;
+ int b = v.base;
+ ForkJoinTask<?>[] q = v.queue;
+ if (b == v.sp || q == null)
+ break; // empty
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase;
+ ForkJoinTask<?> t = q[i];
+ if (task.status < 0)
+ break outer; // stale
+ if (t != null &&
+ (running ||
+ (running = p.tryIncrementRunningCount())) &&
+ v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ if (t != joinMe && joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ break outer; // joinMe cancelled; back out
+ }
+ v.base = b;
+ if (t.status >= 0) {
+ ForkJoinTask<?> ps = currentSteal;
+ int pid = poolIndex;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, ps);
+ }
+ }
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ break outer;
+ }
+ }
+
+ // Try to descend to find v's stealer
+ ForkJoinTask<?> next = v.currentJoin;
+ if (task.status < 0 || next == null || next == task ||
+ joinMe.status < 0)
+ break; // done, stale, dead-end, or cyclic
+ task = next;
+ thread = v;
}
- // Try to descend to find v's stealer
- ForkJoinTask<?> next = v.currentJoin;
- if (task.status < 0 || next == null || next == task ||
- joinMe.status < 0)
- return;
- task = next;
- thread = v;
}
+ return running;
}
/**
--- a/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Wed Jan 12 14:40:36 2011 +0000
@@ -1029,6 +1029,8 @@
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
+ *
+ * @return an iterator over the elements in this deque in reverse order
*/
public Iterator<E> descendingIterator() {
return new DescendingItr();
--- a/jdk/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -189,14 +189,14 @@
}
/**
- * Creates a node and links it at end of queue.
+ * Links node at end of queue.
*
- * @param x the item
+ * @param node the node
*/
- private void enqueue(E x) {
+ private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
- last = last.next = new Node<E>(x);
+ last = last.next = node;
}
/**
@@ -282,7 +282,7 @@
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
- enqueue(e);
+ enqueue(new Node<E>(e));
++n;
}
count.set(n);
@@ -332,6 +332,7 @@
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
+ Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
@@ -347,7 +348,7 @@
while (count.get() == capacity) {
notFull.await();
}
- enqueue(e);
+ enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -382,7 +383,7 @@
return false;
nanos = notFull.awaitNanos(nanos);
}
- enqueue(e);
+ enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -411,11 +412,12 @@
if (count.get() == capacity)
return false;
int c = -1;
+ Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
- enqueue(e);
+ enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -560,6 +562,27 @@
}
/**
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this queue
+ * @return {@code true} if this queue contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ fullyLock();
+ try {
+ for (Node<E> p = head.next; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
*
@@ -645,7 +668,20 @@
public String toString() {
fullyLock();
try {
- return super.toString();
+ Node<E> p = head.next;
+ if (p == null)
+ return "[]";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (;;) {
+ E e = p.item;
+ sb.append(e == this ? "(this Collection)" : e);
+ p = p.next;
+ if (p == null)
+ return sb.append(']').toString();
+ sb.append(',').append(' ');
+ }
} finally {
fullyUnlock();
}
@@ -727,12 +763,14 @@
/**
* Returns an iterator over the elements in this queue in proper sequence.
- * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * The elements will be returned in order from first (head) to last (tail).
+ *
+ * <p>The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
- * ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed to)
- * reflect any modifications subsequent to construction.
+ * ConcurrentModificationException}, and guarantees to traverse
+ * elements as they existed upon construction of the iterator, and
+ * may (but is not guaranteed to) reflect any modifications
+ * subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
--- a/jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -37,10 +37,10 @@
import java.util.AbstractQueue;
import java.util.Collection;
-import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
@@ -450,7 +450,7 @@
}
final boolean casItem(Object cmp, Object val) {
- // assert cmp == null || cmp.getClass() != Node.class;
+ // assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
@@ -516,7 +516,7 @@
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
- // assert isData;
+ // assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
@@ -569,7 +569,7 @@
@SuppressWarnings("unchecked")
static <E> E cast(Object item) {
- // assert item == null || item.getClass() != Node.class;
+ // assert item == null || item.getClass() != Node.class;
return (E) item;
}
@@ -588,7 +588,8 @@
throw new NullPointerException();
Node s = null; // the node to append, if needed
- retry: for (;;) { // restart on append race
+ retry:
+ for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
@@ -599,7 +600,7 @@
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
- if (head == h && casHead(h, n == null? q : n)) {
+ if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
@@ -684,7 +685,7 @@
for (;;) {
Object item = s.item;
if (item != e) { // matched
- // assert item != s;
+ // assert item != s;
s.forgetContents(); // avoid garbage
return this.<E>cast(item);
}
@@ -809,22 +810,61 @@
* Moves to next node after prev, or first node if prev null.
*/
private void advance(Node prev) {
- lastPred = lastRet;
- lastRet = prev;
- for (Node p = (prev == null) ? head : succ(prev);
- p != null; p = succ(p)) {
- Object item = p.item;
- if (p.isData) {
- if (item != null && item != p) {
- nextItem = LinkedTransferQueue.this.<E>cast(item);
- nextNode = p;
+ /*
+ * To track and avoid buildup of deleted nodes in the face
+ * of calls to both Queue.remove and Itr.remove, we must
+ * include variants of unsplice and sweep upon each
+ * advance: Upon Itr.remove, we may need to catch up links
+ * from lastPred, and upon other removes, we might need to
+ * skip ahead from stale nodes and unsplice deleted ones
+ * found while advancing.
+ */
+
+ Node r, b; // reset lastPred upon possible deletion of lastRet
+ if ((r = lastRet) != null && !r.isMatched())
+ lastPred = r; // next lastPred is old lastRet
+ else if ((b = lastPred) == null || b.isMatched())
+ lastPred = null; // at start of list
+ else {
+ Node s, n; // help with removal of lastPred.next
+ while ((s = b.next) != null &&
+ s != b && s.isMatched() &&
+ (n = s.next) != null && n != s)
+ b.casNext(s, n);
+ }
+
+ this.lastRet = prev;
+
+ for (Node p = prev, s, n;;) {
+ s = (p == null) ? head : p.next;
+ if (s == null)
+ break;
+ else if (s == p) {
+ p = null;
+ continue;
+ }
+ Object item = s.item;
+ if (s.isData) {
+ if (item != null && item != s) {
+ nextItem = LinkedTransferQueue.<E>cast(item);
+ nextNode = s;
return;
}
}
else if (item == null)
break;
+ // assert s.isMatched();
+ if (p == null)
+ p = s;
+ else if ((n = s.next) == null)
+ break;
+ else if (s == n)
+ p = null;
+ else
+ p.casNext(s, n);
}
nextNode = null;
+ nextItem = null;
}
Itr() {
@@ -844,10 +884,12 @@
}
public final void remove() {
- Node p = lastRet;
- if (p == null) throw new IllegalStateException();
- if (p.tryMatchData())
- unsplice(lastPred, p);
+ final Node lastRet = this.lastRet;
+ if (lastRet == null)
+ throw new IllegalStateException();
+ this.lastRet = null;
+ if (lastRet.tryMatchData())
+ unsplice(lastPred, lastRet);
}
}
@@ -997,8 +1039,7 @@
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
- * @return {@code true} (as specified by
- * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
@@ -1130,15 +1171,15 @@
}
/**
- * Returns an iterator over the elements in this queue in proper
- * sequence, from head to tail.
+ * Returns an iterator over the elements in this queue in proper sequence.
+ * The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is a "weakly consistent" iterator that
- * will never throw
- * {@link ConcurrentModificationException ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed
- * to) reflect any modifications subsequent to construction.
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException}, and guarantees to traverse
+ * elements as they existed upon construction of the iterator, and
+ * may (but is not guaranteed to) reflect any modifications
+ * subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@@ -1203,6 +1244,28 @@
}
/**
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this queue
+ * @return {@code true} if this queue contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ for (Node p = head; p != null; p = succ(p)) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p && o.equals(item))
+ return true;
+ }
+ else if (item == null)
+ break;
+ }
+ return false;
+ }
+
+ /**
* Always returns {@code Integer.MAX_VALUE} because a
* {@code LinkedTransferQueue} is not capacity constrained.
*
--- a/jdk/src/share/classes/java/util/concurrent/Phaser.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/Phaser.java Wed Jan 12 14:40:36 2011 +0000
@@ -35,6 +35,8 @@
package java.util.concurrent;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
@@ -61,38 +63,38 @@
* Phaser} may be repeatedly awaited. Method {@link
* #arriveAndAwaitAdvance} has effect analogous to {@link
* java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
- * generation of a {@code Phaser} has an associated phase number. The
- * phase number starts at zero, and advances when all parties arrive
- * at the barrier, wrapping around to zero after reaching {@code
+ * generation of a phaser has an associated phase number. The phase
+ * number starts at zero, and advances when all parties arrive at the
+ * phaser, wrapping around to zero after reaching {@code
* Integer.MAX_VALUE}. The use of phase numbers enables independent
- * control of actions upon arrival at a barrier and upon awaiting
+ * control of actions upon arrival at a phaser and upon awaiting
* others, via two kinds of methods that may be invoked by any
* registered party:
*
* <ul>
*
* <li> <b>Arrival.</b> Methods {@link #arrive} and
- * {@link #arriveAndDeregister} record arrival at a
- * barrier. These methods do not block, but return an associated
- * <em>arrival phase number</em>; that is, the phase number of
- * the barrier to which the arrival applied. When the final
- * party for a given phase arrives, an optional barrier action
- * is performed and the phase advances. Barrier actions,
- * performed by the party triggering a phase advance, are
- * arranged by overriding method {@link #onAdvance(int, int)},
- * which also controls termination. Overriding this method is
- * similar to, but more flexible than, providing a barrier
- * action to a {@code CyclicBarrier}.
+ * {@link #arriveAndDeregister} record arrival. These methods
+ * do not block, but return an associated <em>arrival phase
+ * number</em>; that is, the phase number of the phaser to which
+ * the arrival applied. When the final party for a given phase
+ * arrives, an optional action is performed and the phase
+ * advances. These actions are performed by the party
+ * triggering a phase advance, and are arranged by overriding
+ * method {@link #onAdvance(int, int)}, which also controls
+ * termination. Overriding this method is similar to, but more
+ * flexible than, providing a barrier action to a {@code
+ * CyclicBarrier}.
*
* <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
* argument indicating an arrival phase number, and returns when
- * the barrier advances to (or is already at) a different phase.
+ * the phaser advances to (or is already at) a different phase.
* Unlike similar constructions using {@code CyclicBarrier},
* method {@code awaitAdvance} continues to wait even if the
* waiting thread is interrupted. Interruptible and timeout
* versions are also available, but exceptions encountered while
* tasks wait interruptibly or with timeout do not change the
- * state of the barrier. If necessary, you can perform any
+ * state of the phaser. If necessary, you can perform any
* associated recovery within handlers of those exceptions,
* often after invoking {@code forceTermination}. Phasers may
* also be used by tasks executing in a {@link ForkJoinPool},
@@ -101,26 +103,39 @@
*
* </ul>
*
- * <p> <b>Termination.</b> A {@code Phaser} may enter a
- * <em>termination</em> state in which all synchronization methods
- * immediately return without updating phaser state or waiting for
- * advance, and indicating (via a negative phase value) that execution
- * is complete. Termination is triggered when an invocation of {@code
- * onAdvance} returns {@code true}. As illustrated below, when
- * phasers control actions with a fixed number of iterations, it is
- * often convenient to override this method to cause termination when
- * the current phase number reaches a threshold. Method {@link
- * #forceTermination} is also available to abruptly release waiting
- * threads and allow them to terminate.
+ * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
+ * state, that may be checked using method {@link #isTerminated}. Upon
+ * termination, all synchronization methods immediately return without
+ * waiting for advance, as indicated by a negative return value.
+ * Similarly, attempts to register upon termination have no effect.
+ * Termination is triggered when an invocation of {@code onAdvance}
+ * returns {@code true}. The default implementation returns {@code
+ * true} if a deregistration has caused the number of registered
+ * parties to become zero. As illustrated below, when phasers control
+ * actions with a fixed number of iterations, it is often convenient
+ * to override this method to cause termination when the current phase
+ * number reaches a threshold. Method {@link #forceTermination} is
+ * also available to abruptly release waiting threads and allow them
+ * to terminate.
*
- * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
- * in tree structures) to reduce contention. Phasers with large
- * numbers of parties that would otherwise experience heavy
+ * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
+ * constructed in tree structures) to reduce contention. Phasers with
+ * large numbers of parties that would otherwise experience heavy
* synchronization contention costs may instead be set up so that
* groups of sub-phasers share a common parent. This may greatly
* increase throughput even though it incurs greater per-operation
* overhead.
*
+ * <p>In a tree of tiered phasers, registration and deregistration of
+ * child phasers with their parent are managed automatically.
+ * Whenever the number of registered parties of a child phaser becomes
+ * non-zero (as established in the {@link #Phaser(Phaser,int)}
+ * constructor, {@link #register}, or {@link #bulkRegister}), the
+ * child phaser is registered with its parent. Whenever the number of
+ * registered parties becomes zero as the result of an invocation of
+ * {@link #arriveAndDeregister}, the child phaser is deregistered
+ * from its parent.
+ *
* <p><b>Monitoring.</b> While synchronization methods may be invoked
* only by registered parties, the current state of a phaser may be
* monitored by any caller. At any given moment there are {@link
@@ -136,9 +151,9 @@
* <p><b>Sample usages:</b>
*
* <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
- * to control a one-shot action serving a variable number of
- * parties. The typical idiom is for the method setting this up to
- * first register, then start the actions, then deregister, as in:
+ * to control a one-shot action serving a variable number of parties.
+ * The typical idiom is for the method setting this up to first
+ * register, then start the actions, then deregister, as in:
*
* <pre> {@code
* void runTasks(List<Runnable> tasks) {
@@ -208,34 +223,32 @@
* }}</pre>
*
*
- * <p>To create a set of tasks using a tree of phasers,
- * you could use code of the following form, assuming a
- * Task class with a constructor accepting a phaser that
- * it registers for upon construction:
+ * <p>To create a set of {@code n} tasks using a tree of phasers, you
+ * could use code of the following form, assuming a Task class with a
+ * constructor accepting a {@code Phaser} that it registers with upon
+ * construction. After invocation of {@code build(new Task[n], 0, n,
+ * new Phaser())}, these tasks could then be started, for example by
+ * submitting to a pool:
*
* <pre> {@code
- * void build(Task[] actions, int lo, int hi, Phaser ph) {
+ * void build(Task[] tasks, int lo, int hi, Phaser ph) {
* if (hi - lo > TASKS_PER_PHASER) {
* for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
* int j = Math.min(i + TASKS_PER_PHASER, hi);
- * build(actions, i, j, new Phaser(ph));
+ * build(tasks, i, j, new Phaser(ph));
* }
* } else {
* for (int i = lo; i < hi; ++i)
- * actions[i] = new Task(ph);
+ * tasks[i] = new Task(ph);
* // assumes new Task(ph) performs ph.register()
* }
- * }
- * // .. initially called, for n tasks via
- * build(new Task[n], 0, n, new Phaser());}</pre>
+ * }}</pre>
*
* The best value of {@code TASKS_PER_PHASER} depends mainly on
- * expected barrier synchronization rates. A value as low as four may
- * be appropriate for extremely small per-barrier task bodies (thus
+ * expected synchronization rates. A value as low as four may
+ * be appropriate for extremely small per-phase task bodies (thus
* high rates), or up to hundreds for extremely large ones.
*
- * </pre>
- *
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of parties to 65535. Attempts to register additional
* parties result in {@code IllegalStateException}. However, you can and
@@ -253,60 +266,66 @@
*/
/**
- * Barrier state representation. Conceptually, a barrier contains
- * four values:
+ * Primary state representation, holding four bit-fields:
*
- * * parties -- the number of parties to wait (16 bits)
- * * unarrived -- the number of parties yet to hit barrier (16 bits)
- * * phase -- the generation of the barrier (31 bits)
- * * terminated -- set if barrier is terminated (1 bit)
+ * unarrived -- the number of parties yet to hit barrier (bits 0-15)
+ * parties -- the number of parties to wait (bits 16-31)
+ * phase -- the generation of the barrier (bits 32-62)
+ * terminated -- set if barrier is terminated (bit 63 / sign)
+ *
+ * Except that a phaser with no registered parties is
+ * distinguished by the otherwise illegal state of having zero
+ * parties and one unarrived parties (encoded as EMPTY below).
*
- * However, to efficiently maintain atomicity, these values are
- * packed into a single (atomic) long. Termination uses the sign
- * bit of 32 bit representation of phase, so phase is set to -1 on
- * termination. Good performance relies on keeping state decoding
- * and encoding simple, and keeping race windows short.
+ * To efficiently maintain atomicity, these values are packed into
+ * a single (atomic) long. Good performance relies on keeping
+ * state decoding and encoding simple, and keeping race windows
+ * short.
*
- * Note: there are some cheats in arrive() that rely on unarrived
- * count being lowest 16 bits.
+ * All state updates are performed via CAS except initial
+ * registration of a sub-phaser (i.e., one with a non-null
+ * parent). In this (relatively rare) case, we use built-in
+ * synchronization to lock while first registering with its
+ * parent.
+ *
+ * The phase of a subphaser is allowed to lag that of its
+ * ancestors until it is actually accessed -- see method
+ * reconcileState.
*/
private volatile long state;
- private static final int ushortMask = 0xffff;
- private static final int phaseMask = 0x7fffffff;
+ private static final int MAX_PARTIES = 0xffff;
+ private static final int MAX_PHASE = Integer.MAX_VALUE;
+ private static final int PARTIES_SHIFT = 16;
+ private static final int PHASE_SHIFT = 32;
+ private static final int UNARRIVED_MASK = 0xffff; // to mask ints
+ private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
+ private static final long TERMINATION_BIT = 1L << 63;
+
+ // some special values
+ private static final int ONE_ARRIVAL = 1;
+ private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
+ private static final int EMPTY = 1;
+
+ // The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
- return (int) (s & ushortMask);
+ int counts = (int)s;
+ return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
}
private static int partiesOf(long s) {
- return ((int) s) >>> 16;
+ return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
- return (int) (s >>> 32);
+ return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
- return partiesOf(s) - unarrivedOf(s);
- }
-
- private static long stateFor(int phase, int parties, int unarrived) {
- return ((((long) phase) << 32) | (((long) parties) << 16) |
- (long) unarrived);
- }
-
- private static long trippedStateFor(int phase, int parties) {
- long lp = (long) parties;
- return (((long) phase) << 32) | (lp << 16) | lp;
- }
-
- /**
- * Returns message string for bad bounds exceptions.
- */
- private static String badBounds(int parties, int unarrived) {
- return ("Attempt to set " + unarrived +
- " unarrived of " + parties + " parties");
+ int counts = (int)s;
+ return (counts == EMPTY) ? 0 :
+ (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
/**
@@ -315,70 +334,180 @@
private final Phaser parent;
/**
- * The root of phaser tree. Equals this if not in a tree. Used to
- * support faster state push-down.
+ * The root of phaser tree. Equals this if not in a tree.
*/
private final Phaser root;
- // Wait queues
-
/**
* Heads of Treiber stacks for waiting threads. To eliminate
- * contention while releasing some threads while adding others, we
+ * contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
+ * Subphasers share queues with root to speed up releases.
*/
- private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
- private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
+ private final AtomicReference<QNode> evenQ;
+ private final AtomicReference<QNode> oddQ;
private AtomicReference<QNode> queueFor(int phase) {
return ((phase & 1) == 0) ? evenQ : oddQ;
}
/**
- * Returns current state, first resolving lagged propagation from
- * root if necessary.
+ * Returns message string for bounds exceptions on arrival.
+ */
+ private String badArrive(long s) {
+ return "Attempted arrival of unregistered party for " +
+ stateToString(s);
+ }
+
+ /**
+ * Returns message string for bounds exceptions on registration.
+ */
+ private String badRegister(long s) {
+ return "Attempt to register more than " +
+ MAX_PARTIES + " parties for " + stateToString(s);
+ }
+
+ /**
+ * Main implementation for methods arrive and arriveAndDeregister.
+ * Manually tuned to speed up and minimize race windows for the
+ * common case of just decrementing unarrived field.
+ *
+ * @param deregister false for arrive, true for arriveAndDeregister
*/
- private long getReconciledState() {
- return (parent == null) ? state : reconcileState();
+ private int doArrive(boolean deregister) {
+ int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
+ final Phaser root = this.root;
+ for (;;) {
+ long s = (root == this) ? state : reconcileState();
+ int phase = (int)(s >>> PHASE_SHIFT);
+ int counts = (int)s;
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
+ if (phase < 0)
+ return phase;
+ else if (counts == EMPTY || unarrived < 0) {
+ if (root == this || reconcileState() == s)
+ throw new IllegalStateException(badArrive(s));
+ }
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
+ if (unarrived == 0) {
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (root != this)
+ return parent.doArrive(nextUnarrived == 0);
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
+ }
+ return phase;
+ }
+ }
}
/**
- * Recursively resolves state.
+ * Implementation of register, bulkRegister
+ *
+ * @param registrations number to add to both parties and
+ * unarrived fields. Must be greater than zero.
*/
- private long reconcileState() {
- Phaser p = parent;
- long s = state;
- if (p != null) {
- while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
- long parentState = p.getReconciledState();
- int parentPhase = phaseOf(parentState);
- int phase = phaseOf(s = state);
- if (phase != parentPhase) {
- long next = trippedStateFor(parentPhase, partiesOf(s));
- if (casState(s, next)) {
- releaseWaiters(phase);
- s = next;
+ private int doRegister(int registrations) {
+ // adjustment to state
+ long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
+ final Phaser parent = this.parent;
+ int phase;
+ for (;;) {
+ long s = state;
+ int counts = (int)s;
+ int parties = counts >>> PARTIES_SHIFT;
+ int unarrived = counts & UNARRIVED_MASK;
+ if (registrations > MAX_PARTIES - parties)
+ throw new IllegalStateException(badRegister(s));
+ else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
+ break;
+ else if (counts != EMPTY) { // not 1st registration
+ if (parent == null || reconcileState() == s) {
+ if (unarrived == 0) // wait out advance
+ root.internalAwaitAdvance(phase, null);
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset,
+ s, s + adj))
+ break;
+ }
+ }
+ else if (parent == null) { // 1st root registration
+ long next = ((long)phase << PHASE_SHIFT) | adj;
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
+ break;
+ }
+ else {
+ synchronized (this) { // 1st sub registration
+ if (state == s) { // recheck under lock
+ parent.doRegister(1);
+ do { // force current phase
+ phase = (int)(root.state >>> PHASE_SHIFT);
+ // assert phase < 0 || (int)state == EMPTY;
+ } while (!UNSAFE.compareAndSwapLong
+ (this, stateOffset, state,
+ ((long)phase << PHASE_SHIFT) | adj));
+ break;
}
}
}
}
+ return phase;
+ }
+
+ /**
+ * Resolves lagged phase propagation from root if necessary.
+ * Reconciliation normally occurs when root has advanced but
+ * subphasers have not yet done so, in which case they must finish
+ * their own advance by setting unarrived to parties (or if
+ * parties is zero, resetting to unregistered EMPTY state).
+ * However, this method may also be called when "floating"
+ * subphasers with possibly some unarrived parties are merely
+ * catching up to current phase, in which case counts are
+ * unaffected.
+ *
+ * @return reconciled state
+ */
+ private long reconcileState() {
+ final Phaser root = this.root;
+ long s = state;
+ if (root != this) {
+ int phase, u, p;
+ // CAS root phase with current parties; possibly trip unarrived
+ while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
+ (int)(s >>> PHASE_SHIFT) &&
+ !UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ s = (((long)phase << PHASE_SHIFT) |
+ (s & PARTIES_MASK) |
+ ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
+ (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
+ s = state;
+ }
return s;
}
/**
- * Creates a new phaser without any initially registered parties,
- * initial phase number 0, and no parent. Any thread using this
+ * Creates a new phaser with no initially registered parties, no
+ * parent, and initial phase number 0. Any thread using this
* phaser will need to first register for it.
*/
public Phaser() {
- this(null);
+ this(null, 0);
}
/**
- * Creates a new phaser with the given numbers of registered
- * unarrived parties, initial phase number 0, and no parent.
+ * Creates a new phaser with the given number of registered
+ * unarrived parties, no parent, and initial phase number 0.
*
- * @param parties the number of parties required to trip barrier
+ * @param parties the number of parties required to advance to the
+ * next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
@@ -387,54 +516,62 @@
}
/**
- * Creates a new phaser with the given parent, without any
- * initially registered parties. If parent is non-null this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
+ * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
*
* @param parent the parent phaser
*/
public Phaser(Phaser parent) {
- int phase = 0;
- this.parent = parent;
- if (parent != null) {
- this.root = parent.root;
- phase = parent.register();
- }
- else
- this.root = this;
- this.state = trippedStateFor(phase, 0);
+ this(parent, 0);
}
/**
- * Creates a new phaser with the given parent and numbers of
- * registered unarrived parties. If parent is non-null, this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
+ * Creates a new phaser with the given parent and number of
+ * registered unarrived parties. When the given parent is non-null
+ * and the given number of parties is greater than zero, this
+ * child phaser is registered with its parent.
*
* @param parent the parent phaser
- * @param parties the number of parties required to trip barrier
+ * @param parties the number of parties required to advance to the
+ * next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
public Phaser(Phaser parent, int parties) {
- if (parties < 0 || parties > ushortMask)
+ if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
- this.root = parent.root;
- phase = parent.register();
+ final Phaser root = parent.root;
+ this.root = root;
+ this.evenQ = root.evenQ;
+ this.oddQ = root.oddQ;
+ if (parties != 0)
+ phase = parent.doRegister(1);
}
- else
+ else {
this.root = this;
- this.state = trippedStateFor(phase, parties);
+ this.evenQ = new AtomicReference<QNode>();
+ this.oddQ = new AtomicReference<QNode>();
+ }
+ this.state = (parties == 0) ? (long)EMPTY :
+ ((long)phase << PHASE_SHIFT) |
+ ((long)parties << PARTIES_SHIFT) |
+ ((long)parties);
}
/**
- * Adds a new unarrived party to this phaser.
+ * Adds a new unarrived party to this phaser. If an ongoing
+ * invocation of {@link #onAdvance} is in progress, this method
+ * may await its completion before returning. If this phaser has
+ * a parent, and this phaser previously had no registered parties,
+ * this child phaser is also registered with its parent. If
+ * this phaser is terminated, the attempt to register has
+ * no effect, and a negative value is returned.
*
- * @return the arrival phase number to which this registration applied
+ * @return the arrival phase number to which this registration
+ * applied. If this value is negative, then this phaser has
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
*/
@@ -444,11 +581,22 @@
/**
* Adds the given number of new unarrived parties to this phaser.
+ * If an ongoing invocation of {@link #onAdvance} is in progress,
+ * this method may await its completion before returning. If this
+ * phaser has a parent, and the given number of parties is greater
+ * than zero, and this phaser previously had no registered
+ * parties, this child phaser is also registered with its parent.
+ * If this phaser is terminated, the attempt to register has no
+ * effect, and a negative value is returned.
*
- * @param parties the number of parties required to trip barrier
- * @return the arrival phase number to which this registration applied
+ * @param parties the number of additional parties required to
+ * advance to the next phase
+ * @return the arrival phase number to which this registration
+ * applied. If this value is negative, then this phaser has
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
+ * @throws IllegalArgumentException if {@code parties < 0}
*/
public int bulkRegister(int parties) {
if (parties < 0)
@@ -459,258 +607,210 @@
}
/**
- * Shared code for register, bulkRegister
- */
- private int doRegister(int registrations) {
- int phase;
- for (;;) {
- long s = getReconciledState();
- phase = phaseOf(s);
- int unarrived = unarrivedOf(s) + registrations;
- int parties = partiesOf(s) + registrations;
- if (phase < 0)
- break;
- if (parties > ushortMask || unarrived > ushortMask)
- throw new IllegalStateException(badBounds(parties, unarrived));
- if (phase == phaseOf(root.state) &&
- casState(s, stateFor(phase, parties, unarrived)))
- break;
- }
- return phase;
- }
-
- /**
- * Arrives at the barrier, but does not wait for others. (You can
- * in turn wait for others via {@link #awaitAdvance}). It is an
- * unenforced usage error for an unregistered party to invoke this
- * method.
+ * Arrives at this phaser, without waiting for others to arrive.
+ *
+ * <p>It is a usage error for an unregistered party to invoke this
+ * method. However, this error may result in an {@code
+ * IllegalStateException} only upon some subsequent operation on
+ * this phaser, if ever.
*
* @return the arrival phase number, or a negative value if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arrive() {
- int phase;
- for (;;) {
- long s = state;
- phase = phaseOf(s);
- if (phase < 0)
- break;
- int parties = partiesOf(s);
- int unarrived = unarrivedOf(s) - 1;
- if (unarrived > 0) { // Not the last arrival
- if (casState(s, s - 1)) // s-1 adds one arrival
- break;
- }
- else if (unarrived == 0) { // the last arrival
- Phaser par = parent;
- if (par == null) { // directly trip
- if (casState
- (s,
- trippedStateFor(onAdvance(phase, parties) ? -1 :
- ((phase + 1) & phaseMask), parties))) {
- releaseWaiters(phase);
- break;
- }
- }
- else { // cascade to parent
- if (casState(s, s - 1)) { // zeroes unarrived
- par.arrive();
- reconcileState();
- break;
- }
- }
- }
- else if (phase != phaseOf(root.state)) // or if unreconciled
- reconcileState();
- else
- throw new IllegalStateException(badBounds(parties, unarrived));
- }
- return phase;
+ return doArrive(false);
}
/**
- * Arrives at the barrier and deregisters from it without waiting
- * for others. Deregistration reduces the number of parties
- * required to trip the barrier in future phases. If this phaser
+ * Arrives at this phaser and deregisters from it without waiting
+ * for others to arrive. Deregistration reduces the number of
+ * parties required to advance in future phases. If this phaser
* has a parent, and deregistration causes this phaser to have
- * zero parties, this phaser also arrives at and is deregistered
- * from its parent. It is an unenforced usage error for an
- * unregistered party to invoke this method.
+ * zero parties, this phaser is also deregistered from its parent.
+ *
+ * <p>It is a usage error for an unregistered party to invoke this
+ * method. However, this error may result in an {@code
+ * IllegalStateException} only upon some subsequent operation on
+ * this phaser, if ever.
*
* @return the arrival phase number, or a negative value if terminated
* @throws IllegalStateException if not terminated and the number
* of registered or unarrived parties would become negative
*/
public int arriveAndDeregister() {
- // similar code to arrive, but too different to merge
- Phaser par = parent;
- int phase;
- for (;;) {
- long s = state;
- phase = phaseOf(s);
- if (phase < 0)
- break;
- int parties = partiesOf(s) - 1;
- int unarrived = unarrivedOf(s) - 1;
- if (parties >= 0) {
- if (unarrived > 0 || (unarrived == 0 && par != null)) {
- if (casState
- (s,
- stateFor(phase, parties, unarrived))) {
- if (unarrived == 0) {
- par.arriveAndDeregister();
- reconcileState();
- }
- break;
- }
- continue;
- }
- if (unarrived == 0) {
- if (casState
- (s,
- trippedStateFor(onAdvance(phase, parties) ? -1 :
- ((phase + 1) & phaseMask), parties))) {
- releaseWaiters(phase);
- break;
- }
- continue;
- }
- if (par != null && phase != phaseOf(root.state)) {
- reconcileState();
- continue;
- }
- }
- throw new IllegalStateException(badBounds(parties, unarrived));
- }
- return phase;
+ return doArrive(true);
}
/**
- * Arrives at the barrier and awaits others. Equivalent in effect
+ * Arrives at this phaser and awaits others. Equivalent in effect
* to {@code awaitAdvance(arrive())}. If you need to await with
* interruption or timeout, you can arrange this with an analogous
- * construction using one of the other forms of the awaitAdvance
- * method. If instead you need to deregister upon arrival use
- * {@code arriveAndDeregister}. It is an unenforced usage error
- * for an unregistered party to invoke this method.
+ * construction using one of the other forms of the {@code
+ * awaitAdvance} method. If instead you need to deregister upon
+ * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
*
- * @return the arrival phase number, or a negative number if terminated
+ * <p>It is a usage error for an unregistered party to invoke this
+ * method. However, this error may result in an {@code
+ * IllegalStateException} only upon some subsequent operation on
+ * this phaser, if ever.
+ *
+ * @return the arrival phase number, or the (negative)
+ * {@linkplain #getPhase() current phase} if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arriveAndAwaitAdvance() {
- return awaitAdvance(arrive());
+ // Specialization of doArrive+awaitAdvance eliminating some reads/paths
+ final Phaser root = this.root;
+ for (;;) {
+ long s = (root == this) ? state : reconcileState();
+ int phase = (int)(s >>> PHASE_SHIFT);
+ int counts = (int)s;
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
+ if (phase < 0)
+ return phase;
+ else if (counts == EMPTY || unarrived < 0) {
+ if (reconcileState() == s)
+ throw new IllegalStateException(badArrive(s));
+ }
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
+ s -= ONE_ARRIVAL)) {
+ if (unarrived != 0)
+ return root.internalAwaitAdvance(phase, null);
+ if (root != this)
+ return parent.arriveAndAwaitAdvance();
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ int nextPhase = (phase + 1) & MAX_PHASE;
+ n |= (long)nextPhase << PHASE_SHIFT;
+ if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
+ return (int)(state >>> PHASE_SHIFT); // terminated
+ releaseWaiters(phase);
+ return nextPhase;
+ }
+ }
}
/**
- * Awaits the phase of the barrier to advance from the given phase
- * value, returning immediately if the current phase of the
- * barrier is not equal to the given phase value or this barrier
- * is terminated. It is an unenforced usage error for an
- * unregistered party to invoke this method.
+ * Awaits the phase of this phaser to advance from the given phase
+ * value, returning immediately if the current phase is not equal
+ * to the given phase value or this phaser is terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
- * previous call to {@code arrive} or its variants
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * previous call to {@code arrive} or {@code arriveAndDeregister}.
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
*/
public int awaitAdvance(int phase) {
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvance(phase);
- // Fall here even if parent waited, to reconcile and help release
- return untimedWait(phase);
+ if (p == phase)
+ return root.internalAwaitAdvance(phase, null);
+ return p;
}
/**
- * Awaits the phase of the barrier to advance from the given phase
+ * Awaits the phase of this phaser to advance from the given phase
* value, throwing {@code InterruptedException} if interrupted
- * while waiting, or returning immediately if the current phase of
- * the barrier is not equal to the given phase value or this
- * barrier is terminated. It is an unenforced usage error for an
- * unregistered party to invoke this method.
+ * while waiting, or returning immediately if the current phase is
+ * not equal to the given phase value or this phaser is
+ * terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
- * previous call to {@code arrive} or its variants
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * previous call to {@code arrive} or {@code arriveAndDeregister}.
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
* @throws InterruptedException if thread interrupted while waiting
*/
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvanceInterruptibly(phase);
- return interruptibleWait(phase);
+ if (p == phase) {
+ QNode node = new QNode(this, phase, true, false, 0L);
+ p = root.internalAwaitAdvance(phase, node);
+ if (node.wasInterrupted)
+ throw new InterruptedException();
+ }
+ return p;
}
/**
- * Awaits the phase of the barrier to advance from the given phase
+ * Awaits the phase of this phaser to advance from the given phase
* value or the given timeout to elapse, throwing {@code
* InterruptedException} if interrupted while waiting, or
- * returning immediately if the current phase of the barrier is
- * not equal to the given phase value or this barrier is
- * terminated. It is an unenforced usage error for an
- * unregistered party to invoke this method.
+ * returning immediately if the current phase is not equal to the
+ * given phase value or this phaser is terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
- * previous call to {@code arrive} or its variants
+ * previous call to {@code arrive} or {@code arriveAndDeregister}.
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
* @throws InterruptedException if thread interrupted while waiting
* @throws TimeoutException if timed out while waiting
*/
public int awaitAdvanceInterruptibly(int phase,
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
+ long nanos = unit.toNanos(timeout);
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
- long s = getReconciledState();
- int p = phaseOf(s);
- if (p != phase)
- return p;
- if (unarrivedOf(s) == 0 && parent != null)
- parent.awaitAdvanceInterruptibly(phase, timeout, unit);
- return timedWait(phase, unit.toNanos(timeout));
+ if (p == phase) {
+ QNode node = new QNode(this, phase, true, true, nanos);
+ p = root.internalAwaitAdvance(phase, node);
+ if (node.wasInterrupted)
+ throw new InterruptedException();
+ else if (p == phase)
+ throw new TimeoutException();
+ }
+ return p;
}
/**
- * Forces this barrier to enter termination state. Counts of
- * arrived and registered parties are unaffected. If this phaser
- * has a parent, it too is terminated. This method may be useful
- * for coordinating recovery after one or more tasks encounter
+ * Forces this phaser to enter termination state. Counts of
+ * registered parties are unaffected. If this phaser is a member
+ * of a tiered set of phasers, then all of the phasers in the set
+ * are terminated. If this phaser is already terminated, this
+ * method has no effect. This method may be useful for
+ * coordinating recovery after one or more tasks encounter
* unexpected exceptions.
*/
public void forceTermination() {
- for (;;) {
- long s = getReconciledState();
- int phase = phaseOf(s);
- int parties = partiesOf(s);
- int unarrived = unarrivedOf(s);
- if (phase < 0 ||
- casState(s, stateFor(-1, parties, unarrived))) {
+ // Only need to change root state
+ final Phaser root = this.root;
+ long s;
+ while ((s = root.state) >= 0) {
+ if (UNSAFE.compareAndSwapLong(root, stateOffset,
+ s, s | TERMINATION_BIT)) {
+ // signal all threads
releaseWaiters(0);
releaseWaiters(1);
- if (parent != null)
- parent.forceTermination();
return;
}
}
@@ -719,16 +819,18 @@
/**
* Returns the current phase number. The maximum phase number is
* {@code Integer.MAX_VALUE}, after which it restarts at
- * zero. Upon termination, the phase number is negative.
+ * zero. Upon termination, the phase number is negative,
+ * in which case the prevailing phase prior to termination
+ * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
*
* @return the phase number, or a negative value if terminated
*/
public final int getPhase() {
- return phaseOf(getReconciledState());
+ return (int)(root.state >>> PHASE_SHIFT);
}
/**
- * Returns the number of parties registered at this barrier.
+ * Returns the number of parties registered at this phaser.
*
* @return the number of parties
*/
@@ -738,22 +840,24 @@
/**
* Returns the number of registered parties that have arrived at
- * the current phase of this barrier.
+ * the current phase of this phaser. If this phaser has terminated,
+ * the returned value is meaningless and arbitrary.
*
* @return the number of arrived parties
*/
public int getArrivedParties() {
- return arrivedOf(state);
+ return arrivedOf(reconcileState());
}
/**
* Returns the number of registered parties that have not yet
- * arrived at the current phase of this barrier.
+ * arrived at the current phase of this phaser. If this phaser has
+ * terminated, the returned value is meaningless and arbitrary.
*
* @return the number of unarrived parties
*/
public int getUnarrivedParties() {
- return unarrivedOf(state);
+ return unarrivedOf(reconcileState());
}
/**
@@ -776,52 +880,56 @@
}
/**
- * Returns {@code true} if this barrier has been terminated.
+ * Returns {@code true} if this phaser has been terminated.
*
- * @return {@code true} if this barrier has been terminated
+ * @return {@code true} if this phaser has been terminated
*/
public boolean isTerminated() {
- return getPhase() < 0;
+ return root.state < 0L;
}
/**
* Overridable method to perform an action upon impending phase
* advance, and to control termination. This method is invoked
- * upon arrival of the party tripping the barrier (when all other
+ * upon arrival of the party advancing this phaser (when all other
* waiting parties are dormant). If this method returns {@code
- * true}, then, rather than advance the phase number, this barrier
- * will be set to a final termination state, and subsequent calls
- * to {@link #isTerminated} will return true. Any (unchecked)
- * Exception or Error thrown by an invocation of this method is
- * propagated to the party attempting to trip the barrier, in
- * which case no advance occurs.
+ * true}, this phaser will be set to a final termination state
+ * upon advance, and subsequent calls to {@link #isTerminated}
+ * will return true. Any (unchecked) Exception or Error thrown by
+ * an invocation of this method is propagated to the party
+ * attempting to advance this phaser, in which case no advance
+ * occurs.
*
* <p>The arguments to this method provide the state of the phaser
- * prevailing for the current transition. (When called from within
- * an implementation of {@code onAdvance} the values returned by
- * methods such as {@code getPhase} may or may not reliably
- * indicate the state to which this transition applies.)
+ * prevailing for the current transition. The effects of invoking
+ * arrival, registration, and waiting methods on this phaser from
+ * within {@code onAdvance} are unspecified and should not be
+ * relied on.
*
- * <p>The default version returns {@code true} when the number of
- * registered parties is zero. Normally, overrides that arrange
- * termination for other reasons should also preserve this
- * property.
+ * <p>If this phaser is a member of a tiered set of phasers, then
+ * {@code onAdvance} is invoked only for its root phaser on each
+ * advance.
*
- * <p>You may override this method to perform an action with side
- * effects visible to participating tasks, but it is only sensible
- * to do so in designs where all parties register before any
- * arrive, and all {@link #awaitAdvance} at each phase.
- * Otherwise, you cannot ensure lack of interference from other
- * parties during the invocation of this method. Additionally,
- * method {@code onAdvance} may be invoked more than once per
- * transition if registrations are intermixed with arrivals.
+ * <p>To support the most common use cases, the default
+ * implementation of this method returns {@code true} when the
+ * number of registered parties has become zero as the result of a
+ * party invoking {@code arriveAndDeregister}. You can disable
+ * this behavior, thus enabling continuation upon future
+ * registrations, by overriding this method to always return
+ * {@code false}:
*
- * @param phase the phase number on entering the barrier
+ * <pre> {@code
+ * Phaser phaser = new Phaser() {
+ * protected boolean onAdvance(int phase, int parties) { return false; }
+ * }}</pre>
+ *
+ * @param phase the current phase number on entry to this method,
+ * before this phaser is advanced
* @param registeredParties the current number of registered parties
- * @return {@code true} if this barrier should terminate
+ * @return {@code true} if this phaser should terminate
*/
protected boolean onAdvance(int phase, int registeredParties) {
- return registeredParties <= 0;
+ return registeredParties == 0;
}
/**
@@ -831,17 +939,138 @@
* followed by the number of registered parties, and {@code
* "arrived = "} followed by the number of arrived parties.
*
- * @return a string identifying this barrier, as well as its state
+ * @return a string identifying this phaser, as well as its state
*/
public String toString() {
- long s = getReconciledState();
+ return stateToString(reconcileState());
+ }
+
+ /**
+ * Implementation of toString and string-based error messages
+ */
+ private String stateToString(long s) {
return super.toString() +
"[phase = " + phaseOf(s) +
" parties = " + partiesOf(s) +
" arrived = " + arrivedOf(s) + "]";
}
- // methods for waiting
+ // Waiting mechanics
+
+ /**
+ * Removes and signals threads from queue for phase.
+ */
+ private void releaseWaiters(int phase) {
+ QNode q; // first element of queue
+ Thread t; // its thread
+ AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
+ while ((q = head.get()) != null &&
+ q.phase != (int)(root.state >>> PHASE_SHIFT)) {
+ if (head.compareAndSet(q, q.next) &&
+ (t = q.thread) != null) {
+ q.thread = null;
+ LockSupport.unpark(t);
+ }
+ }
+ }
+
+ /**
+ * Variant of releaseWaiters that additionally tries to remove any
+ * nodes no longer waiting for advance due to timeout or
+ * interrupt. Currently, nodes are removed only if they are at
+ * head of queue, which suffices to reduce memory footprint in
+ * most usages.
+ *
+ * @return current phase on exit
+ */
+ private int abortWait(int phase) {
+ AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
+ for (;;) {
+ Thread t;
+ QNode q = head.get();
+ int p = (int)(root.state >>> PHASE_SHIFT);
+ if (q == null || ((t = q.thread) != null && q.phase == p))
+ return p;
+ if (head.compareAndSet(q, q.next) && t != null) {
+ q.thread = null;
+ LockSupport.unpark(t);
+ }
+ }
+ }
+
+ /** The number of CPUs, for spin control */
+ private static final int NCPU = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * The number of times to spin before blocking while waiting for
+ * advance, per arrival while waiting. On multiprocessors, fully
+ * blocking and waking up a large number of threads all at once is
+ * usually a very slow process, so we use rechargeable spins to
+ * avoid it when threads regularly arrive: When a thread in
+ * internalAwaitAdvance notices another arrival before blocking,
+ * and there appear to be enough CPUs available, it spins
+ * SPINS_PER_ARRIVAL more times before blocking. The value trades
+ * off good-citizenship vs big unnecessary slowdowns.
+ */
+ static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
+
+ /**
+ * Possibly blocks and waits for phase to advance unless aborted.
+ * Call only from root node.
+ *
+ * @param phase current phase
+ * @param node if non-null, the wait node to track interrupt and timeout;
+ * if null, denotes noninterruptible wait
+ * @return current phase
+ */
+ private int internalAwaitAdvance(int phase, QNode node) {
+ releaseWaiters(phase-1); // ensure old queue clean
+ boolean queued = false; // true when node is enqueued
+ int lastUnarrived = 0; // to increase spins upon change
+ int spins = SPINS_PER_ARRIVAL;
+ long s;
+ int p;
+ while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
+ if (node == null) { // spinning in noninterruptible mode
+ int unarrived = (int)s & UNARRIVED_MASK;
+ if (unarrived != lastUnarrived &&
+ (lastUnarrived = unarrived) < NCPU)
+ spins += SPINS_PER_ARRIVAL;
+ boolean interrupted = Thread.interrupted();
+ if (interrupted || --spins < 0) { // need node to record intr
+ node = new QNode(this, phase, false, false, 0L);
+ node.wasInterrupted = interrupted;
+ }
+ }
+ else if (node.isReleasable()) // done or aborted
+ break;
+ else if (!queued) { // push onto queue
+ AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
+ QNode q = node.next = head.get();
+ if ((q == null || q.phase == phase) &&
+ (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
+ queued = head.compareAndSet(q, node);
+ }
+ else {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ node.wasInterrupted = true;
+ }
+ }
+ }
+
+ if (node != null) {
+ if (node.thread != null)
+ node.thread = null; // avoid need for unpark()
+ if (node.wasInterrupted && !node.interruptible)
+ Thread.currentThread().interrupt();
+ if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
+ return abortWait(phase); // possibly clean up on abort
+ }
+ releaseWaiters(phase);
+ return p;
+ }
/**
* Wait nodes for Treiber stack representing wait queue
@@ -849,174 +1078,61 @@
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
- final long startTime;
- final long nanos;
+ final boolean interruptible;
final boolean timed;
- final boolean interruptible;
- volatile boolean wasInterrupted = false;
+ boolean wasInterrupted;
+ long nanos;
+ long lastTime;
volatile Thread thread; // nulled to cancel wait
QNode next;
+
QNode(Phaser phaser, int phase, boolean interruptible,
- boolean timed, long startTime, long nanos) {
+ boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
+ this.interruptible = interruptible;
+ this.nanos = nanos;
this.timed = timed;
- this.interruptible = interruptible;
- this.startTime = startTime;
- this.nanos = nanos;
+ this.lastTime = timed ? System.nanoTime() : 0L;
thread = Thread.currentThread();
}
- public boolean isReleasable() {
- return (thread == null ||
- phaser.getPhase() != phase ||
- (interruptible && wasInterrupted) ||
- (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
- }
- public boolean block() {
- if (Thread.interrupted()) {
- wasInterrupted = true;
- if (interruptible)
- return true;
- }
- if (!timed)
- LockSupport.park(this);
- else {
- long waitTime = nanos - (System.nanoTime() - startTime);
- if (waitTime <= 0)
- return true;
- LockSupport.parkNanos(this, waitTime);
- }
- return isReleasable();
- }
- void signal() {
- Thread t = thread;
- if (t != null) {
- thread = null;
- LockSupport.unpark(t);
- }
- }
- boolean doWait() {
- if (thread != null) {
- try {
- ForkJoinPool.managedBlock(this);
- } catch (InterruptedException ie) {
- }
- }
- return wasInterrupted;
- }
- }
-
- /**
- * Removes and signals waiting threads from wait queue.
- */
- private void releaseWaiters(int phase) {
- AtomicReference<QNode> head = queueFor(phase);
- QNode q;
- while ((q = head.get()) != null) {
- if (head.compareAndSet(q, q.next))
- q.signal();
+ public boolean isReleasable() {
+ if (thread == null)
+ return true;
+ if (phaser.getPhase() != phase) {
+ thread = null;
+ return true;
+ }
+ if (Thread.interrupted())
+ wasInterrupted = true;
+ if (wasInterrupted && interruptible) {
+ thread = null;
+ return true;
+ }
+ if (timed) {
+ if (nanos > 0L) {
+ long now = System.nanoTime();
+ nanos -= now - lastTime;
+ lastTime = now;
+ }
+ if (nanos <= 0L) {
+ thread = null;
+ return true;
+ }
+ }
+ return false;
}
- }
-
- /**
- * Tries to enqueue given node in the appropriate wait queue.
- *
- * @return true if successful
- */
- private boolean tryEnqueue(QNode node) {
- AtomicReference<QNode> head = queueFor(node.phase);
- return head.compareAndSet(node.next = head.get(), node);
- }
-
- /**
- * Enqueues node and waits unless aborted or signalled.
- *
- * @return current phase
- */
- private int untimedWait(int phase) {
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase) {
- if (Thread.interrupted())
- interrupted = true;
- else if (node == null)
- node = new QNode(this, phase, false, false, 0, 0);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
- }
- if (node != null)
- node.thread = null;
- releaseWaiters(phase);
- if (interrupted)
- Thread.currentThread().interrupt();
- return p;
- }
- /**
- * Interruptible version
- * @return current phase
- */
- private int interruptibleWait(int phase) throws InterruptedException {
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase && !interrupted) {
- if (Thread.interrupted())
- interrupted = true;
- else if (node == null)
- node = new QNode(this, phase, true, false, 0, 0);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
+ public boolean block() {
+ if (isReleasable())
+ return true;
+ else if (!timed)
+ LockSupport.park(this);
+ else if (nanos > 0)
+ LockSupport.parkNanos(this, nanos);
+ return isReleasable();
}
- if (node != null)
- node.thread = null;
- if (p != phase || (p = getPhase()) != phase)
- releaseWaiters(phase);
- if (interrupted)
- throw new InterruptedException();
- return p;
- }
-
- /**
- * Timeout version.
- * @return current phase
- */
- private int timedWait(int phase, long nanos)
- throws InterruptedException, TimeoutException {
- long startTime = System.nanoTime();
- QNode node = null;
- boolean queued = false;
- boolean interrupted = false;
- int p;
- while ((p = getPhase()) == phase && !interrupted) {
- if (Thread.interrupted())
- interrupted = true;
- else if (nanos - (System.nanoTime() - startTime) <= 0)
- break;
- else if (node == null)
- node = new QNode(this, phase, true, true, startTime, nanos);
- else if (!queued)
- queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
- }
- if (node != null)
- node.thread = null;
- if (p != phase || (p = getPhase()) != phase)
- releaseWaiters(phase);
- if (interrupted)
- throw new InterruptedException();
- if (p == phase)
- throw new TimeoutException();
- return p;
}
// Unsafe mechanics
@@ -1025,10 +1141,6 @@
private static final long stateOffset =
objectFieldOffset("state", Phaser.class);
- private final boolean casState(long cmp, long val) {
- return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
- }
-
private static long objectFieldOffset(String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
--- a/jdk/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -43,11 +43,11 @@
* the same ordering rules as class {@link PriorityQueue} and supplies
* blocking retrieval operations. While this queue is logically
* unbounded, attempted additions may fail due to resource exhaustion
- * (causing <tt>OutOfMemoryError</tt>). This class does not permit
- * <tt>null</tt> elements. A priority queue relying on {@linkplain
+ * (causing {@code OutOfMemoryError}). This class does not permit
+ * {@code null} elements. A priority queue relying on {@linkplain
* Comparable natural ordering} also does not permit insertion of
* non-comparable objects (doing so results in
- * <tt>ClassCastException</tt>).
+ * {@code ClassCastException}).
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
@@ -55,7 +55,7 @@
* #iterator()} is <em>not</em> guaranteed to traverse the elements of
* the PriorityBlockingQueue in any particular order. If you need
* ordered traversal, consider using
- * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
+ * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
* can be used to <em>remove</em> some or all elements in priority
* order and place them in another collection.
*
@@ -65,12 +65,12 @@
* secondary key to break ties in primary priority values. For
* example, here is a class that applies first-in-first-out
* tie-breaking to comparable elements. To use it, you would insert a
- * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
+ * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
*
- * <pre>
- * class FIFOEntry<E extends Comparable<? super E>>
- * implements Comparable<FIFOEntry<E>> {
- * final static AtomicLong seq = new AtomicLong();
+ * <pre> {@code
+ * class FIFOEntry<E extends Comparable<? super E>>
+ * implements Comparable<FIFOEntry<E>> {
+ * static final AtomicLong seq = new AtomicLong(0);
* final long seqNum;
* final E entry;
* public FIFOEntry(E entry) {
@@ -78,13 +78,13 @@
* this.entry = entry;
* }
* public E getEntry() { return entry; }
- * public int compareTo(FIFOEntry<E> other) {
+ * public int compareTo(FIFOEntry<E> other) {
* int res = entry.compareTo(other.entry);
- * if (res == 0 && other.entry != this.entry)
- * res = (seqNum < other.seqNum ? -1 : 1);
+ * if (res == 0 && other.entry != this.entry)
+ * res = (seqNum < other.seqNum ? -1 : 1);
* return res;
* }
- * }</pre>
+ * }}</pre>
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
@@ -98,34 +98,102 @@
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L;
- private final PriorityQueue<E> q;
- private final ReentrantLock lock = new ReentrantLock(true);
- private final Condition notEmpty = lock.newCondition();
+ /*
+ * The implementation uses an array-based binary heap, with public
+ * operations protected with a single lock. However, allocation
+ * during resizing uses a simple spinlock (used only while not
+ * holding main lock) in order to allow takes to operate
+ * concurrently with allocation. This avoids repeated
+ * postponement of waiting consumers and consequent element
+ * build-up. The need to back away from lock during allocation
+ * makes it impossible to simply wrap delegated
+ * java.util.PriorityQueue operations within a lock, as was done
+ * in a previous version of this class. To maintain
+ * interoperability, a plain PriorityQueue is still used during
+ * serialization, which maintains compatibility at the espense of
+ * transiently doubling overhead.
+ */
+
+ /**
+ * Default array capacity.
+ */
+ private static final int DEFAULT_INITIAL_CAPACITY = 11;
+
+ /**
+ * The maximum size of array to allocate.
+ * Some VMs reserve some header words in an array.
+ * Attempts to allocate larger arrays may result in
+ * OutOfMemoryError: Requested array size exceeds VM limit
+ */
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
- * Creates a <tt>PriorityBlockingQueue</tt> with the default
+ * Priority queue represented as a balanced binary heap: the two
+ * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
+ * priority queue is ordered by comparator, or by the elements'
+ * natural ordering, if comparator is null: For each node n in the
+ * heap and each descendant d of n, n <= d. The element with the
+ * lowest value is in queue[0], assuming the queue is nonempty.
+ */
+ private transient Object[] queue;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private transient int size;
+
+ /**
+ * The comparator, or null if priority queue uses elements'
+ * natural ordering.
+ */
+ private transient Comparator<? super E> comparator;
+
+ /**
+ * Lock used for all public operations
+ */
+ private final ReentrantLock lock;
+
+ /**
+ * Condition for blocking when empty
+ */
+ private final Condition notEmpty;
+
+ /**
+ * Spinlock for allocation, acquired via CAS.
+ */
+ private transient volatile int allocationSpinLock;
+
+ /**
+ * A plain PriorityQueue used only for serialization,
+ * to maintain compatibility with previous versions
+ * of this class. Non-null only during serialization/deserialization.
+ */
+ private PriorityQueue q;
+
+ /**
+ * Creates a {@code PriorityBlockingQueue} with the default
* initial capacity (11) that orders its elements according to
* their {@linkplain Comparable natural ordering}.
*/
public PriorityBlockingQueue() {
- q = new PriorityQueue<E>();
+ this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
- * Creates a <tt>PriorityBlockingQueue</tt> with the specified
+ * Creates a {@code PriorityBlockingQueue} with the specified
* initial capacity that orders its elements according to their
* {@linkplain Comparable natural ordering}.
*
* @param initialCapacity the initial capacity for this priority queue
- * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+ * @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity) {
- q = new PriorityQueue<E>(initialCapacity, null);
+ this(initialCapacity, null);
}
/**
- * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
+ * Creates a {@code PriorityBlockingQueue} with the specified initial
* capacity that orders its elements according to the specified
* comparator.
*
@@ -133,16 +201,21 @@
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
- * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+ * @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
- q = new PriorityQueue<E>(initialCapacity, comparator);
+ if (initialCapacity < 1)
+ throw new IllegalArgumentException();
+ this.lock = new ReentrantLock();
+ this.notEmpty = lock.newCondition();
+ this.comparator = comparator;
+ this.queue = new Object[initialCapacity];
}
/**
- * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
+ * Creates a {@code PriorityBlockingQueue} containing the elements
* in the specified collection. If the specified collection is a
* {@link SortedSet} or a {@link PriorityQueue}, this
* priority queue will be ordered according to the same ordering.
@@ -158,14 +231,215 @@
* of its elements are null
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
- q = new PriorityQueue<E>(c);
+ this.lock = new ReentrantLock();
+ this.notEmpty = lock.newCondition();
+ boolean heapify = true; // true if not known to be in heap order
+ boolean screen = true; // true if must screen for nulls
+ if (c instanceof SortedSet<?>) {
+ SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
+ this.comparator = (Comparator<? super E>) ss.comparator();
+ heapify = false;
+ }
+ else if (c instanceof PriorityBlockingQueue<?>) {
+ PriorityBlockingQueue<? extends E> pq =
+ (PriorityBlockingQueue<? extends E>) c;
+ this.comparator = (Comparator<? super E>) pq.comparator();
+ screen = false;
+ if (pq.getClass() == PriorityBlockingQueue.class) // exact match
+ heapify = false;
+ }
+ Object[] a = c.toArray();
+ int n = a.length;
+ // If c.toArray incorrectly doesn't return Object[], copy it.
+ if (a.getClass() != Object[].class)
+ a = Arrays.copyOf(a, n, Object[].class);
+ if (screen && (n == 1 || this.comparator != null)) {
+ for (int i = 0; i < n; ++i)
+ if (a[i] == null)
+ throw new NullPointerException();
+ }
+ this.queue = a;
+ this.size = n;
+ if (heapify)
+ heapify();
+ }
+
+ /**
+ * Tries to grow array to accommodate at least one more element
+ * (but normally expand by about 50%), giving up (allowing retry)
+ * on contention (which we expect to be rare). Call only while
+ * holding lock.
+ *
+ * @param array the heap array
+ * @param oldCap the length of the array
+ */
+ private void tryGrow(Object[] array, int oldCap) {
+ lock.unlock(); // must release and then re-acquire main lock
+ Object[] newArray = null;
+ if (allocationSpinLock == 0 &&
+ UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
+ 0, 1)) {
+ try {
+ int newCap = oldCap + ((oldCap < 64) ?
+ (oldCap + 2) : // grow faster if small
+ (oldCap >> 1));
+ if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
+ int minCap = oldCap + 1;
+ if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
+ throw new OutOfMemoryError();
+ newCap = MAX_ARRAY_SIZE;
+ }
+ if (newCap > oldCap && queue == array)
+ newArray = new Object[newCap];
+ } finally {
+ allocationSpinLock = 0;
+ }
+ }
+ if (newArray == null) // back off if another thread is allocating
+ Thread.yield();
+ lock.lock();
+ if (newArray != null && queue == array) {
+ queue = newArray;
+ System.arraycopy(array, 0, newArray, 0, oldCap);
+ }
+ }
+
+ /**
+ * Mechanics for poll(). Call only while holding lock.
+ */
+ private E extract() {
+ E result;
+ int n = size - 1;
+ if (n < 0)
+ result = null;
+ else {
+ Object[] array = queue;
+ result = (E) array[0];
+ E x = (E) array[n];
+ array[n] = null;
+ Comparator<? super E> cmp = comparator;
+ if (cmp == null)
+ siftDownComparable(0, x, array, n);
+ else
+ siftDownUsingComparator(0, x, array, n, cmp);
+ size = n;
+ }
+ return result;
+ }
+
+ /**
+ * Inserts item x at position k, maintaining heap invariant by
+ * promoting x up the tree until it is greater than or equal to
+ * its parent, or is the root.
+ *
+ * To simplify and speed up coercions and comparisons. the
+ * Comparable and Comparator versions are separated into different
+ * methods that are otherwise identical. (Similarly for siftDown.)
+ * These methods are static, with heap state as arguments, to
+ * simplify use in light of possible comparator exceptions.
+ *
+ * @param k the position to fill
+ * @param x the item to insert
+ * @param array the heap array
+ * @param n heap size
+ */
+ private static <T> void siftUpComparable(int k, T x, Object[] array) {
+ Comparable<? super T> key = (Comparable<? super T>) x;
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ Object e = array[parent];
+ if (key.compareTo((T) e) >= 0)
+ break;
+ array[k] = e;
+ k = parent;
+ }
+ array[k] = key;
+ }
+
+ private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
+ Comparator<? super T> cmp) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ Object e = array[parent];
+ if (cmp.compare(x, (T) e) >= 0)
+ break;
+ array[k] = e;
+ k = parent;
+ }
+ array[k] = x;
+ }
+
+ /**
+ * Inserts item x at position k, maintaining heap invariant by
+ * demoting x down the tree repeatedly until it is less than or
+ * equal to its children or is a leaf.
+ *
+ * @param k the position to fill
+ * @param x the item to insert
+ * @param array the heap array
+ * @param n heap size
+ */
+ private static <T> void siftDownComparable(int k, T x, Object[] array,
+ int n) {
+ Comparable<? super T> key = (Comparable<? super T>)x;
+ int half = n >>> 1; // loop while a non-leaf
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+ Object c = array[child];
+ int right = child + 1;
+ if (right < n &&
+ ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
+ c = array[child = right];
+ if (key.compareTo((T) c) <= 0)
+ break;
+ array[k] = c;
+ k = child;
+ }
+ array[k] = key;
+ }
+
+ private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
+ int n,
+ Comparator<? super T> cmp) {
+ int half = n >>> 1;
+ while (k < half) {
+ int child = (k << 1) + 1;
+ Object c = array[child];
+ int right = child + 1;
+ if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
+ c = array[child = right];
+ if (cmp.compare(x, (T) c) <= 0)
+ break;
+ array[k] = c;
+ k = child;
+ }
+ array[k] = x;
+ }
+
+ /**
+ * Establishes the heap invariant (described above) in the entire tree,
+ * assuming nothing about the order of the elements prior to the call.
+ */
+ private void heapify() {
+ Object[] array = queue;
+ int n = size;
+ int half = (n >>> 1) - 1;
+ Comparator<? super E> cmp = comparator;
+ if (cmp == null) {
+ for (int i = half; i >= 0; i--)
+ siftDownComparable(i, (E) array[i], array, n);
+ }
+ else {
+ for (int i = half; i >= 0; i--)
+ siftDownUsingComparator(i, (E) array[i], array, n, cmp);
+ }
}
/**
* Inserts the specified element into this priority queue.
*
* @param e the element to add
- * @return <tt>true</tt> (as specified by {@link Collection#add})
+ * @return {@code true} (as specified by {@link Collection#add})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
@@ -177,30 +451,41 @@
/**
* Inserts the specified element into this priority queue.
+ * As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
- * @return <tt>true</tt> (as specified by {@link Queue#offer})
+ * @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
+ if (e == null)
+ throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
+ int n, cap;
+ Object[] array;
+ while ((n = size) >= (cap = (array = queue).length))
+ tryGrow(array, cap);
try {
- boolean ok = q.offer(e);
- assert ok;
+ Comparator<? super E> cmp = comparator;
+ if (cmp == null)
+ siftUpComparable(n, e, array);
+ else
+ siftUpUsingComparator(n, e, array, cmp);
+ size = n + 1;
notEmpty.signal();
- return true;
} finally {
lock.unlock();
}
+ return true;
}
/**
- * Inserts the specified element into this priority queue. As the queue is
- * unbounded this method will never block.
+ * Inserts the specified element into this priority queue.
+ * As the queue is unbounded, this method will never block.
*
* @param e the element to add
* @throws ClassCastException if the specified element cannot be compared
@@ -213,13 +498,15 @@
}
/**
- * Inserts the specified element into this priority queue. As the queue is
- * unbounded this method will never block.
+ * Inserts the specified element into this priority queue.
+ * As the queue is unbounded, this method will never block or
+ * return {@code false}.
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
- * @return <tt>true</tt>
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
@@ -232,95 +519,121 @@
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
+ E result;
try {
- return q.poll();
+ result = extract();
} finally {
lock.unlock();
}
+ return result;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
+ E result;
try {
- try {
- while (q.size() == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- E x = q.poll();
- assert x != null;
- return x;
+ while ( (result = extract()) == null)
+ notEmpty.await();
} finally {
lock.unlock();
}
+ return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
+ E result;
try {
- for (;;) {
- E x = q.poll();
- if (x != null)
- return x;
- if (nanos <= 0)
- return null;
- try {
- nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- }
+ while ( (result = extract()) == null && nanos > 0)
+ nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
+ return result;
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
+ E result;
try {
- return q.peek();
+ result = size > 0 ? (E) queue[0] : null;
+ } finally {
+ lock.unlock();
+ }
+ return result;
+ }
+
+ /**
+ * Returns the comparator used to order the elements in this queue,
+ * or {@code null} if this queue uses the {@linkplain Comparable
+ * natural ordering} of its elements.
+ *
+ * @return the comparator used to order the elements in this queue,
+ * or {@code null} if this queue uses the natural
+ * ordering of its elements
+ */
+ public Comparator<? super E> comparator() {
+ return comparator;
+ }
+
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return size;
} finally {
lock.unlock();
}
}
/**
- * Returns the comparator used to order the elements in this queue,
- * or <tt>null</tt> if this queue uses the {@linkplain Comparable
- * natural ordering} of its elements.
- *
- * @return the comparator used to order the elements in this queue,
- * or <tt>null</tt> if this queue uses the natural
- * ordering of its elements
+ * Always returns {@code Integer.MAX_VALUE} because
+ * a {@code PriorityBlockingQueue} is not capacity constrained.
+ * @return {@code Integer.MAX_VALUE} always
*/
- public Comparator<? super E> comparator() {
- return q.comparator();
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
}
- public int size() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return q.size();
- } finally {
- lock.unlock();
+ private int indexOf(Object o) {
+ if (o != null) {
+ Object[] array = queue;
+ int n = size;
+ for (int i = 0; i < n; i++)
+ if (o.equals(array[i]))
+ return i;
}
+ return -1;
}
/**
- * Always returns <tt>Integer.MAX_VALUE</tt> because
- * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
- * @return <tt>Integer.MAX_VALUE</tt>
+ * Removes the ith element from queue.
*/
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
+ private void removeAt(int i) {
+ Object[] array = queue;
+ int n = size - 1;
+ if (n == i) // removed last element
+ array[i] = null;
+ else {
+ E moved = (E) array[n];
+ array[n] = null;
+ Comparator<? super E> cmp = comparator;
+ if (cmp == null)
+ siftDownComparable(i, moved, array, n);
+ else
+ siftDownUsingComparator(i, moved, array, n, cmp);
+ if (array[i] == moved) {
+ if (cmp == null)
+ siftUpComparable(i, moved, array);
+ else
+ siftUpUsingComparator(i, moved, array, cmp);
+ }
+ }
+ size = n;
}
/**
@@ -332,13 +645,40 @@
* result of the call).
*
* @param o element to be removed from this queue, if present
- * @return <tt>true</tt> if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
+ boolean removed = false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return q.remove(o);
+ int i = indexOf(o);
+ if (i != -1) {
+ removeAt(i);
+ removed = true;
+ }
+ } finally {
+ lock.unlock();
+ }
+ return removed;
+ }
+
+
+ /**
+ * Identity-based version for use in Itr.remove
+ */
+ private void removeEQ(Object o) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ Object[] array = queue;
+ int n = size;
+ for (int i = 0; i < n; i++) {
+ if (o == array[i]) {
+ removeAt(i);
+ break;
+ }
+ }
} finally {
lock.unlock();
}
@@ -350,16 +690,18 @@
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
- * @return <tt>true</tt> if this queue contains the specified element
+ * @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
+ int index;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return q.contains(o);
+ index = indexOf(o);
} finally {
lock.unlock();
}
+ return index != -1;
}
/**
@@ -379,7 +721,7 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return q.toArray();
+ return Arrays.copyOf(queue, size);
} finally {
lock.unlock();
}
@@ -390,7 +732,18 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return q.toString();
+ int n = size;
+ if (n == 0)
+ return "[]";
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (int i = 0; i < n; ++i) {
+ E e = (E)queue[i];
+ sb.append(e == this ? "(this Collection)" : e);
+ if (i != n - 1)
+ sb.append(',').append(' ');
+ }
+ return sb.append(']').toString();
} finally {
lock.unlock();
}
@@ -412,7 +765,7 @@
try {
int n = 0;
E e;
- while ( (e = q.poll()) != null) {
+ while ( (e = extract()) != null) {
c.add(e);
++n;
}
@@ -440,7 +793,7 @@
try {
int n = 0;
E e;
- while (n < maxElements && (e = q.poll()) != null) {
+ while (n < maxElements && (e = extract()) != null) {
c.add(e);
++n;
}
@@ -458,7 +811,11 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- q.clear();
+ Object[] array = queue;
+ int n = size;
+ size = 0;
+ for (int i = 0; i < n; i++)
+ array[i] = null;
} finally {
lock.unlock();
}
@@ -475,22 +832,22 @@
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+ * <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
- * allocated array of <tt>String</tt>:
+ * allocated array of {@code String}:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -505,7 +862,14 @@
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return q.toArray(a);
+ int n = size;
+ if (a.length < n)
+ // Make a new array of a's runtime type, but my contents:
+ return (T[]) Arrays.copyOf(queue, size, a.getClass());
+ System.arraycopy(queue, 0, a, 0, n);
+ if (a.length > n)
+ a[n] = null;
+ return a;
} finally {
lock.unlock();
}
@@ -514,8 +878,9 @@
/**
* Returns an iterator over the elements in this queue. The
* iterator does not return the elements in any particular order.
- * The returned <tt>Iterator</tt> is a "weakly consistent"
- * iterator that will never throw {@link
+ *
+ * <p>The returned iterator is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException}, and guarantees to traverse
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
@@ -530,7 +895,7 @@
/**
* Snapshot iterator that works off copy of underlying q array.
*/
- private class Itr implements Iterator<E> {
+ final class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return;
int lastRet; // index of last element, or -1 if no such
@@ -554,39 +919,65 @@
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
- Object x = array[lastRet];
+ removeEQ(array[lastRet]);
lastRet = -1;
- // Traverse underlying queue to find == element,
- // not just a .equals element.
- lock.lock();
- try {
- for (Iterator it = q.iterator(); it.hasNext(); ) {
- if (it.next() == x) {
- it.remove();
- return;
- }
- }
- } finally {
- lock.unlock();
- }
}
}
/**
- * Saves the state to a stream (that is, serializes it). This
- * merely wraps default serialization within lock. The
- * serialization strategy for items is left to underlying
- * Queue. Note that locking is not needed on deserialization, so
- * readObject is not defined, just relying on default.
+ * Saves the state to a stream (that is, serializes it). For
+ * compatibility with previous version of this class,
+ * elements are first copied to a java.util.PriorityQueue,
+ * which is then serialized.
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
lock.lock();
try {
+ int n = size; // avoid zero capacity argument
+ q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
+ q.addAll(this);
s.defaultWriteObject();
} finally {
+ q = null;
lock.unlock();
}
}
+ /**
+ * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
+ * (that is, deserializes it).
+ *
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ try {
+ s.defaultReadObject();
+ this.queue = new Object[q.size()];
+ comparator = q.comparator();
+ addAll(q);
+ } finally {
+ q = null;
+ }
+ }
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long allocationSpinLockOffset =
+ objectFieldOffset(UNSAFE, "allocationSpinLock",
+ PriorityBlockingQueue.class);
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}
--- a/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Wed Jan 12 14:40:36 2011 +0000
@@ -360,8 +360,12 @@
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
- if (!keepDelayed && !keepPeriodic)
+ if (!keepDelayed && !keepPeriodic) {
+ for (Object e : q.toArray())
+ if (e instanceof RunnableScheduledFuture<?>)
+ ((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
+ }
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
--- a/jdk/src/share/classes/java/util/concurrent/SynchronousQueue.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/SynchronousQueue.java Wed Jan 12 14:40:36 2011 +0000
@@ -163,7 +163,7 @@
/**
* Shared internal API for dual stacks and queues.
*/
- static abstract class Transferer {
+ abstract static class Transferer {
/**
* Performs a put or take.
*
@@ -190,7 +190,7 @@
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
- static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
@@ -241,19 +241,11 @@
this.item = item;
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "next");
-
boolean casNext(SNode cmp, SNode val) {
- return (cmp == next &&
- nextUpdater.compareAndSet(this, cmp, val));
+ return cmp == next &&
+ UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- matchUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "match");
-
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
@@ -264,7 +256,7 @@
*/
boolean tryMatch(SNode s) {
if (match == null &&
- matchUpdater.compareAndSet(this, null, s)) {
+ UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
@@ -279,23 +271,28 @@
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
- matchUpdater.compareAndSet(this, null, this);
+ UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", SNode.class);
+ private static final long matchOffset =
+ objectFieldOffset(UNSAFE, "match", SNode.class);
+
}
/** The head (top) of the stack */
volatile SNode head;
- static final AtomicReferenceFieldUpdater<TransferStack, SNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferStack.class, SNode.class, "head");
-
boolean casHead(SNode h, SNode nh) {
- return h == head && headUpdater.compareAndSet(this, h, nh);
+ return h == head &&
+ UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
/**
@@ -338,7 +335,7 @@
*/
SNode s = null; // constructed/reused as needed
- int mode = (e == null)? REQUEST : DATA;
+ int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
@@ -356,7 +353,7 @@
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
- return mode == REQUEST? m.item : s.item;
+ return (mode == REQUEST) ? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
@@ -372,7 +369,7 @@
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
- return (mode == REQUEST)? m.item : s.item;
+ return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
@@ -423,11 +420,11 @@
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
- long lastTime = (timed)? System.nanoTime() : 0;
+ long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
- int spins = (shouldSpin(s)?
- (timed? maxTimedSpins : maxUntimedSpins) : 0);
+ int spins = (shouldSpin(s) ?
+ (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
@@ -444,7 +441,7 @@
}
}
if (spins > 0)
- spins = shouldSpin(s)? (spins-1) : 0;
+ spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
@@ -499,6 +496,12 @@
p = n;
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferStack.class);
+
}
/** Dual Queue */
@@ -524,29 +527,21 @@
this.isData = isData;
}
- static final AtomicReferenceFieldUpdater<QNode, QNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, QNode.class, "next");
-
boolean casNext(QNode cmp, QNode val) {
- return (next == cmp &&
- nextUpdater.compareAndSet(this, cmp, val));
+ return next == cmp &&
+ UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
- static final AtomicReferenceFieldUpdater<QNode, Object>
- itemUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, Object.class, "item");
-
boolean casItem(Object cmp, Object val) {
- return (item == cmp &&
- itemUpdater.compareAndSet(this, cmp, val));
+ return item == cmp &&
+ UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
- itemUpdater.compareAndSet(this, cmp, this);
+ UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
@@ -561,6 +556,13 @@
boolean isOffList() {
return next == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", QNode.class);
+ private static final long itemOffset =
+ objectFieldOffset(UNSAFE, "item", QNode.class);
}
/** Head of queue */
@@ -580,41 +582,30 @@
tail = h;
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "head");
-
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
- if (h == head && headUpdater.compareAndSet(this, h, nh))
+ if (h == head &&
+ UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- tailUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "tail");
-
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
- tailUpdater.compareAndSet(this, t, nt);
+ UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "cleanMe");
-
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
- return (cleanMe == cmp &&
- cleanMeUpdater.compareAndSet(this, cmp, val));
+ return cleanMe == cmp &&
+ UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
/**
@@ -683,7 +674,7 @@
s.item = s;
s.waiter = null;
}
- return (x != null)? x : e;
+ return (x != null) ? x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
@@ -700,7 +691,7 @@
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
- return (x != null)? x : e;
+ return (x != null) ? x : e;
}
}
}
@@ -716,10 +707,10 @@
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
- long lastTime = (timed)? System.nanoTime() : 0;
+ long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
- (timed? maxTimedSpins : maxUntimedSpins) : 0);
+ (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
@@ -799,6 +790,16 @@
return; // Postpone cleaning s
}
}
+
+ // unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
+ private static final long cleanMeOffset =
+ objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
+
}
/**
@@ -824,7 +825,7 @@
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
- transferer = (fair)? new TransferQueue() : new TransferStack();
+ transferer = fair ? new TransferQueue() : new TransferStack();
}
/**
@@ -1141,4 +1142,17 @@
transferer = new TransferStack();
}
+ // Unsafe mechanics
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}
--- a/jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java Wed Jan 12 14:40:36 2011 +0000
@@ -1841,6 +1841,43 @@
}
}
+ /**
+ * Returns a string identifying this pool, as well as its state,
+ * including indications of run state and estimated worker and
+ * task counts.
+ *
+ * @return a string identifying this pool, as well as its state
+ */
+ public String toString() {
+ long ncompleted;
+ int nworkers, nactive;
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ ncompleted = completedTaskCount;
+ nactive = 0;
+ nworkers = workers.size();
+ for (Worker w : workers) {
+ ncompleted += w.completedTasks;
+ if (w.isLocked())
+ ++nactive;
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ int c = ctl.get();
+ String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
+ (runStateAtLeast(c, TERMINATED) ? "Terminated" :
+ "Shutting down"));
+ return super.toString() +
+ "[" + rs +
+ ", pool size = " + nworkers +
+ ", active threads = " + nactive +
+ ", queued tasks = " + workQueue.size() +
+ ", completed tasks = " + ncompleted +
+ "]";
+ }
+
/* Extension hooks */
/**
@@ -1961,7 +1998,9 @@
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException();
+ throw new RejectedExecutionException("Task " + r.toString() +
+ " rejected from " +
+ e.toString());
}
}
--- a/jdk/src/share/classes/java/util/concurrent/atomic/AtomicIntegerArray.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/atomic/AtomicIntegerArray.java Wed Jan 12 14:40:36 2011 +0000
@@ -48,28 +48,37 @@
public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;
- // setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
- private static final int scale = unsafe.arrayIndexScale(int[].class);
+ private static final int shift;
private final int[] array;
- private long rawIndex(int i) {
+ static {
+ int scale = unsafe.arrayIndexScale(int[].class);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ shift = 31 - Integer.numberOfLeadingZeros(scale);
+ }
+
+ private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
- return base + (long) i * scale;
+
+ return byteOffset(i);
+ }
+
+ private static long byteOffset(int i) {
+ return ((long) i << shift) + base;
}
/**
- * Creates a new AtomicIntegerArray of given length.
+ * Creates a new AtomicIntegerArray of the given length, with all
+ * elements initially zero.
*
* @param length the length of the array
*/
public AtomicIntegerArray(int length) {
array = new int[length];
- // must perform at least one volatile write to conform to JMM
- if (length > 0)
- unsafe.putIntVolatile(array, rawIndex(0), 0);
}
/**
@@ -80,17 +89,8 @@
* @throws NullPointerException if array is null
*/
public AtomicIntegerArray(int[] array) {
- if (array == null)
- throw new NullPointerException();
- int length = array.length;
- this.array = new int[length];
- if (length > 0) {
- int last = length-1;
- for (int i = 0; i < last; ++i)
- this.array[i] = array[i];
- // Do the last write as volatile
- unsafe.putIntVolatile(this.array, rawIndex(last), array[last]);
- }
+ // Visibility guaranteed by final field guarantees
+ this.array = array.clone();
}
/**
@@ -109,7 +109,11 @@
* @return the current value
*/
public final int get(int i) {
- return unsafe.getIntVolatile(array, rawIndex(i));
+ return getRaw(checkedByteOffset(i));
+ }
+
+ private int getRaw(long offset) {
+ return unsafe.getIntVolatile(array, offset);
}
/**
@@ -119,7 +123,7 @@
* @param newValue the new value
*/
public final void set(int i, int newValue) {
- unsafe.putIntVolatile(array, rawIndex(i), newValue);
+ unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
/**
@@ -130,7 +134,7 @@
* @since 1.6
*/
public final void lazySet(int i, int newValue) {
- unsafe.putOrderedInt(array, rawIndex(i), newValue);
+ unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
}
/**
@@ -142,9 +146,10 @@
* @return the previous value
*/
public final int getAndSet(int i, int newValue) {
+ long offset = checkedByteOffset(i);
while (true) {
- int current = get(i);
- if (compareAndSet(i, current, newValue))
+ int current = getRaw(offset);
+ if (compareAndSetRaw(offset, current, newValue))
return current;
}
}
@@ -160,8 +165,11 @@
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, int expect, int update) {
- return unsafe.compareAndSwapInt(array, rawIndex(i),
- expect, update);
+ return compareAndSetRaw(checkedByteOffset(i), expect, update);
+ }
+
+ private boolean compareAndSetRaw(long offset, int expect, int update) {
+ return unsafe.compareAndSwapInt(array, offset, expect, update);
}
/**
@@ -188,12 +196,7 @@
* @return the previous value
*/
public final int getAndIncrement(int i) {
- while (true) {
- int current = get(i);
- int next = current + 1;
- if (compareAndSet(i, current, next))
- return current;
- }
+ return getAndAdd(i, 1);
}
/**
@@ -203,12 +206,7 @@
* @return the previous value
*/
public final int getAndDecrement(int i) {
- while (true) {
- int current = get(i);
- int next = current - 1;
- if (compareAndSet(i, current, next))
- return current;
- }
+ return getAndAdd(i, -1);
}
/**
@@ -219,10 +217,10 @@
* @return the previous value
*/
public final int getAndAdd(int i, int delta) {
+ long offset = checkedByteOffset(i);
while (true) {
- int current = get(i);
- int next = current + delta;
- if (compareAndSet(i, current, next))
+ int current = getRaw(offset);
+ if (compareAndSetRaw(offset, current, current + delta))
return current;
}
}
@@ -234,12 +232,7 @@
* @return the updated value
*/
public final int incrementAndGet(int i) {
- while (true) {
- int current = get(i);
- int next = current + 1;
- if (compareAndSet(i, current, next))
- return next;
- }
+ return addAndGet(i, 1);
}
/**
@@ -249,12 +242,7 @@
* @return the updated value
*/
public final int decrementAndGet(int i) {
- while (true) {
- int current = get(i);
- int next = current - 1;
- if (compareAndSet(i, current, next))
- return next;
- }
+ return addAndGet(i, -1);
}
/**
@@ -265,22 +253,32 @@
* @return the updated value
*/
public final int addAndGet(int i, int delta) {
+ long offset = checkedByteOffset(i);
while (true) {
- int current = get(i);
+ int current = getRaw(offset);
int next = current + delta;
- if (compareAndSet(i, current, next))
+ if (compareAndSetRaw(offset, current, next))
return next;
}
}
/**
* Returns the String representation of the current values of array.
- * @return the String representation of the current values of array.
+ * @return the String representation of the current values of array
*/
public String toString() {
- if (array.length > 0) // force volatile read
- get(0);
- return Arrays.toString(array);
+ int iMax = array.length - 1;
+ if (iMax == -1)
+ return "[]";
+
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; ; i++) {
+ b.append(getRaw(byteOffset(i)));
+ if (i == iMax)
+ return b.append(']').toString();
+ b.append(',').append(' ');
+ }
}
}
--- a/jdk/src/share/classes/java/util/concurrent/atomic/AtomicLongArray.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/atomic/AtomicLongArray.java Wed Jan 12 14:40:36 2011 +0000
@@ -47,28 +47,37 @@
public class AtomicLongArray implements java.io.Serializable {
private static final long serialVersionUID = -2308431214976778248L;
- // setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(long[].class);
- private static final int scale = unsafe.arrayIndexScale(long[].class);
+ private static final int shift;
private final long[] array;
- private long rawIndex(int i) {
+ static {
+ int scale = unsafe.arrayIndexScale(long[].class);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ shift = 31 - Integer.numberOfLeadingZeros(scale);
+ }
+
+ private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
- return base + (long) i * scale;
+
+ return byteOffset(i);
+ }
+
+ private static long byteOffset(int i) {
+ return ((long) i << shift) + base;
}
/**
- * Creates a new AtomicLongArray of given length.
+ * Creates a new AtomicLongArray of the given length, with all
+ * elements initially zero.
*
* @param length the length of the array
*/
public AtomicLongArray(int length) {
array = new long[length];
- // must perform at least one volatile write to conform to JMM
- if (length > 0)
- unsafe.putLongVolatile(array, rawIndex(0), 0);
}
/**
@@ -79,17 +88,8 @@
* @throws NullPointerException if array is null
*/
public AtomicLongArray(long[] array) {
- if (array == null)
- throw new NullPointerException();
- int length = array.length;
- this.array = new long[length];
- if (length > 0) {
- int last = length-1;
- for (int i = 0; i < last; ++i)
- this.array[i] = array[i];
- // Do the last write as volatile
- unsafe.putLongVolatile(this.array, rawIndex(last), array[last]);
- }
+ // Visibility guaranteed by final field guarantees
+ this.array = array.clone();
}
/**
@@ -108,7 +108,11 @@
* @return the current value
*/
public final long get(int i) {
- return unsafe.getLongVolatile(array, rawIndex(i));
+ return getRaw(checkedByteOffset(i));
+ }
+
+ private long getRaw(long offset) {
+ return unsafe.getLongVolatile(array, offset);
}
/**
@@ -118,7 +122,7 @@
* @param newValue the new value
*/
public final void set(int i, long newValue) {
- unsafe.putLongVolatile(array, rawIndex(i), newValue);
+ unsafe.putLongVolatile(array, checkedByteOffset(i), newValue);
}
/**
@@ -129,7 +133,7 @@
* @since 1.6
*/
public final void lazySet(int i, long newValue) {
- unsafe.putOrderedLong(array, rawIndex(i), newValue);
+ unsafe.putOrderedLong(array, checkedByteOffset(i), newValue);
}
@@ -142,16 +146,17 @@
* @return the previous value
*/
public final long getAndSet(int i, long newValue) {
+ long offset = checkedByteOffset(i);
while (true) {
- long current = get(i);
- if (compareAndSet(i, current, newValue))
+ long current = getRaw(offset);
+ if (compareAndSetRaw(offset, current, newValue))
return current;
}
}
/**
- * Atomically sets the value to the given updated value
- * if the current value {@code ==} the expected value.
+ * Atomically sets the element at position {@code i} to the given
+ * updated value if the current value {@code ==} the expected value.
*
* @param i the index
* @param expect the expected value
@@ -160,13 +165,16 @@
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, long expect, long update) {
- return unsafe.compareAndSwapLong(array, rawIndex(i),
- expect, update);
+ return compareAndSetRaw(checkedByteOffset(i), expect, update);
+ }
+
+ private boolean compareAndSetRaw(long offset, long expect, long update) {
+ return unsafe.compareAndSwapLong(array, offset, expect, update);
}
/**
- * Atomically sets the value to the given updated value
- * if the current value {@code ==} the expected value.
+ * Atomically sets the element at position {@code i} to the given
+ * updated value if the current value {@code ==} the expected value.
*
* <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
@@ -188,12 +196,7 @@
* @return the previous value
*/
public final long getAndIncrement(int i) {
- while (true) {
- long current = get(i);
- long next = current + 1;
- if (compareAndSet(i, current, next))
- return current;
- }
+ return getAndAdd(i, 1);
}
/**
@@ -203,12 +206,7 @@
* @return the previous value
*/
public final long getAndDecrement(int i) {
- while (true) {
- long current = get(i);
- long next = current - 1;
- if (compareAndSet(i, current, next))
- return current;
- }
+ return getAndAdd(i, -1);
}
/**
@@ -219,10 +217,10 @@
* @return the previous value
*/
public final long getAndAdd(int i, long delta) {
+ long offset = checkedByteOffset(i);
while (true) {
- long current = get(i);
- long next = current + delta;
- if (compareAndSet(i, current, next))
+ long current = getRaw(offset);
+ if (compareAndSetRaw(offset, current, current + delta))
return current;
}
}
@@ -234,12 +232,7 @@
* @return the updated value
*/
public final long incrementAndGet(int i) {
- while (true) {
- long current = get(i);
- long next = current + 1;
- if (compareAndSet(i, current, next))
- return next;
- }
+ return addAndGet(i, 1);
}
/**
@@ -249,12 +242,7 @@
* @return the updated value
*/
public final long decrementAndGet(int i) {
- while (true) {
- long current = get(i);
- long next = current - 1;
- if (compareAndSet(i, current, next))
- return next;
- }
+ return addAndGet(i, -1);
}
/**
@@ -265,22 +253,32 @@
* @return the updated value
*/
public long addAndGet(int i, long delta) {
+ long offset = checkedByteOffset(i);
while (true) {
- long current = get(i);
+ long current = getRaw(offset);
long next = current + delta;
- if (compareAndSet(i, current, next))
+ if (compareAndSetRaw(offset, current, next))
return next;
}
}
/**
* Returns the String representation of the current values of array.
- * @return the String representation of the current values of array.
+ * @return the String representation of the current values of array
*/
public String toString() {
- if (array.length > 0) // force volatile read
- get(0);
- return Arrays.toString(array);
+ int iMax = array.length - 1;
+ if (iMax == -1)
+ return "[]";
+
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; ; i++) {
+ b.append(getRaw(byteOffset(i)));
+ if (i == iMax)
+ return b.append(']').toString();
+ b.append(',').append(' ');
+ }
}
}
--- a/jdk/src/share/classes/java/util/concurrent/atomic/AtomicMarkableReference.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/atomic/AtomicMarkableReference.java Wed Jan 12 14:40:36 2011 +0000
@@ -38,8 +38,8 @@
/**
* An {@code AtomicMarkableReference} maintains an object reference
* along with a mark bit, that can be updated atomically.
- * <p>
- * <p> Implementation note. This implementation maintains markable
+ *
+ * <p>Implementation note: This implementation maintains markable
* references by creating internal objects representing "boxed"
* [reference, boolean] pairs.
*
@@ -47,17 +47,21 @@
* @author Doug Lea
* @param <V> The type of object referred to by this reference
*/
-public class AtomicMarkableReference<V> {
+public class AtomicMarkableReference<V> {
- private static class ReferenceBooleanPair<T> {
- private final T reference;
- private final boolean bit;
- ReferenceBooleanPair(T r, boolean i) {
- reference = r; bit = i;
+ private static class Pair<T> {
+ final T reference;
+ final boolean mark;
+ private Pair(T reference, boolean mark) {
+ this.reference = reference;
+ this.mark = mark;
+ }
+ static <T> Pair<T> of(T reference, boolean mark) {
+ return new Pair<T>(reference, mark);
}
}
- private final AtomicReference<ReferenceBooleanPair<V>> atomicRef;
+ private volatile Pair<V> pair;
/**
* Creates a new {@code AtomicMarkableReference} with the given
@@ -67,7 +71,7 @@
* @param initialMark the initial mark
*/
public AtomicMarkableReference(V initialRef, boolean initialMark) {
- atomicRef = new AtomicReference<ReferenceBooleanPair<V>> (new ReferenceBooleanPair<V>(initialRef, initialMark));
+ pair = Pair.of(initialRef, initialMark);
}
/**
@@ -76,7 +80,7 @@
* @return the current value of the reference
*/
public V getReference() {
- return atomicRef.get().reference;
+ return pair.reference;
}
/**
@@ -85,7 +89,7 @@
* @return the current value of the mark
*/
public boolean isMarked() {
- return atomicRef.get().bit;
+ return pair.mark;
}
/**
@@ -97,9 +101,9 @@
* @return the current value of the reference
*/
public V get(boolean[] markHolder) {
- ReferenceBooleanPair<V> p = atomicRef.get();
- markHolder[0] = p.bit;
- return p.reference;
+ Pair<V> pair = this.pair;
+ markHolder[0] = pair.mark;
+ return pair.reference;
}
/**
@@ -122,13 +126,8 @@
V newReference,
boolean expectedMark,
boolean newMark) {
- ReferenceBooleanPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- expectedMark == current.bit &&
- ((newReference == current.reference && newMark == current.bit) ||
- atomicRef.weakCompareAndSet(current,
- new ReferenceBooleanPair<V>(newReference,
- newMark)));
+ return compareAndSet(expectedReference, newReference,
+ expectedMark, newMark);
}
/**
@@ -147,13 +146,13 @@
V newReference,
boolean expectedMark,
boolean newMark) {
- ReferenceBooleanPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- expectedMark == current.bit &&
- ((newReference == current.reference && newMark == current.bit) ||
- atomicRef.compareAndSet(current,
- new ReferenceBooleanPair<V>(newReference,
- newMark)));
+ Pair<V> current = pair;
+ return
+ expectedReference == current.reference &&
+ expectedMark == current.mark &&
+ ((newReference == current.reference &&
+ newMark == current.mark) ||
+ casPair(current, Pair.of(newReference, newMark)));
}
/**
@@ -163,9 +162,9 @@
* @param newMark the new value for the mark
*/
public void set(V newReference, boolean newMark) {
- ReferenceBooleanPair<V> current = atomicRef.get();
- if (newReference != current.reference || newMark != current.bit)
- atomicRef.set(new ReferenceBooleanPair<V>(newReference, newMark));
+ Pair<V> current = pair;
+ if (newReference != current.reference || newMark != current.mark)
+ this.pair = Pair.of(newReference, newMark);
}
/**
@@ -182,11 +181,32 @@
* @return true if successful
*/
public boolean attemptMark(V expectedReference, boolean newMark) {
- ReferenceBooleanPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- (newMark == current.bit ||
- atomicRef.compareAndSet
- (current, new ReferenceBooleanPair<V>(expectedReference,
- newMark)));
+ Pair<V> current = pair;
+ return
+ expectedReference == current.reference &&
+ (newMark == current.mark ||
+ casPair(current, Pair.of(expectedReference, newMark)));
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long pairOffset =
+ objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);
+
+ private boolean casPair(Pair<V> cmp, Pair<V> val) {
+ return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
+ }
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
}
}
--- a/jdk/src/share/classes/java/util/concurrent/atomic/AtomicReferenceArray.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/atomic/AtomicReferenceArray.java Wed Jan 12 14:40:36 2011 +0000
@@ -51,24 +51,35 @@
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(Object[].class);
- private static final int scale = unsafe.arrayIndexScale(Object[].class);
+ private static final int shift;
private final Object[] array;
- private long rawIndex(int i) {
+ static {
+ int scale = unsafe.arrayIndexScale(Object[].class);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ shift = 31 - Integer.numberOfLeadingZeros(scale);
+ }
+
+ private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
- return base + (long) i * scale;
+
+ return byteOffset(i);
+ }
+
+ private static long byteOffset(int i) {
+ return ((long) i << shift) + base;
}
/**
- * Creates a new AtomicReferenceArray of given length.
+ * Creates a new AtomicReferenceArray of the given length, with all
+ * elements initially null.
+ *
* @param length the length of the array
*/
public AtomicReferenceArray(int length) {
array = new Object[length];
- // must perform at least one volatile write to conform to JMM
- if (length > 0)
- unsafe.putObjectVolatile(array, rawIndex(0), null);
}
/**
@@ -79,18 +90,8 @@
* @throws NullPointerException if array is null
*/
public AtomicReferenceArray(E[] array) {
- if (array == null)
- throw new NullPointerException();
- int length = array.length;
- this.array = new Object[length];
- if (length > 0) {
- int last = length-1;
- for (int i = 0; i < last; ++i)
- this.array[i] = array[i];
- // Do the last write as volatile
- E e = array[last];
- unsafe.putObjectVolatile(this.array, rawIndex(last), e);
- }
+ // Visibility guaranteed by final field guarantees
+ this.array = array.clone();
}
/**
@@ -109,7 +110,11 @@
* @return the current value
*/
public final E get(int i) {
- return (E) unsafe.getObjectVolatile(array, rawIndex(i));
+ return getRaw(checkedByteOffset(i));
+ }
+
+ private E getRaw(long offset) {
+ return (E) unsafe.getObjectVolatile(array, offset);
}
/**
@@ -119,7 +124,7 @@
* @param newValue the new value
*/
public final void set(int i, E newValue) {
- unsafe.putObjectVolatile(array, rawIndex(i), newValue);
+ unsafe.putObjectVolatile(array, checkedByteOffset(i), newValue);
}
/**
@@ -130,7 +135,7 @@
* @since 1.6
*/
public final void lazySet(int i, E newValue) {
- unsafe.putOrderedObject(array, rawIndex(i), newValue);
+ unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
}
@@ -143,9 +148,10 @@
* @return the previous value
*/
public final E getAndSet(int i, E newValue) {
+ long offset = checkedByteOffset(i);
while (true) {
- E current = get(i);
- if (compareAndSet(i, current, newValue))
+ E current = (E) getRaw(offset);
+ if (compareAndSetRaw(offset, current, newValue))
return current;
}
}
@@ -153,6 +159,7 @@
/**
* Atomically sets the element at position {@code i} to the given
* updated value if the current value {@code ==} the expected value.
+ *
* @param i the index
* @param expect the expected value
* @param update the new value
@@ -160,8 +167,11 @@
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int i, E expect, E update) {
- return unsafe.compareAndSwapObject(array, rawIndex(i),
- expect, update);
+ return compareAndSetRaw(checkedByteOffset(i), expect, update);
+ }
+
+ private boolean compareAndSetRaw(long offset, E expect, E update) {
+ return unsafe.compareAndSwapObject(array, offset, expect, update);
}
/**
@@ -183,12 +193,21 @@
/**
* Returns the String representation of the current values of array.
- * @return the String representation of the current values of array.
+ * @return the String representation of the current values of array
*/
public String toString() {
- if (array.length > 0) // force volatile read
- get(0);
- return Arrays.toString(array);
+ int iMax = array.length - 1;
+ if (iMax == -1)
+ return "[]";
+
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; ; i++) {
+ b.append(getRaw(byteOffset(i)));
+ if (i == iMax)
+ return b.append(']').toString();
+ b.append(',').append(' ');
+ }
}
}
--- a/jdk/src/share/classes/java/util/concurrent/atomic/AtomicStampedReference.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/atomic/AtomicStampedReference.java Wed Jan 12 14:40:36 2011 +0000
@@ -39,7 +39,7 @@
* An {@code AtomicStampedReference} maintains an object reference
* along with an integer "stamp", that can be updated atomically.
*
- * <p> Implementation note. This implementation maintains stamped
+ * <p>Implementation note: This implementation maintains stamped
* references by creating internal objects representing "boxed"
* [reference, integer] pairs.
*
@@ -47,17 +47,21 @@
* @author Doug Lea
* @param <V> The type of object referred to by this reference
*/
-public class AtomicStampedReference<V> {
+public class AtomicStampedReference<V> {
- private static class ReferenceIntegerPair<T> {
- private final T reference;
- private final int integer;
- ReferenceIntegerPair(T r, int i) {
- reference = r; integer = i;
+ private static class Pair<T> {
+ final T reference;
+ final int stamp;
+ private Pair(T reference, int stamp) {
+ this.reference = reference;
+ this.stamp = stamp;
+ }
+ static <T> Pair<T> of(T reference, int stamp) {
+ return new Pair<T>(reference, stamp);
}
}
- private final AtomicReference<ReferenceIntegerPair<V>> atomicRef;
+ private volatile Pair<V> pair;
/**
* Creates a new {@code AtomicStampedReference} with the given
@@ -67,8 +71,7 @@
* @param initialStamp the initial stamp
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
- atomicRef = new AtomicReference<ReferenceIntegerPair<V>>
- (new ReferenceIntegerPair<V>(initialRef, initialStamp));
+ pair = Pair.of(initialRef, initialStamp);
}
/**
@@ -77,7 +80,7 @@
* @return the current value of the reference
*/
public V getReference() {
- return atomicRef.get().reference;
+ return pair.reference;
}
/**
@@ -86,7 +89,7 @@
* @return the current value of the stamp
*/
public int getStamp() {
- return atomicRef.get().integer;
+ return pair.stamp;
}
/**
@@ -98,9 +101,9 @@
* @return the current value of the reference
*/
public V get(int[] stampHolder) {
- ReferenceIntegerPair<V> p = atomicRef.get();
- stampHolder[0] = p.integer;
- return p.reference;
+ Pair<V> pair = this.pair;
+ stampHolder[0] = pair.stamp;
+ return pair.reference;
}
/**
@@ -119,18 +122,12 @@
* @param newStamp the new value for the stamp
* @return true if successful
*/
- public boolean weakCompareAndSet(V expectedReference,
- V newReference,
- int expectedStamp,
- int newStamp) {
- ReferenceIntegerPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- expectedStamp == current.integer &&
- ((newReference == current.reference &&
- newStamp == current.integer) ||
- atomicRef.weakCompareAndSet(current,
- new ReferenceIntegerPair<V>(newReference,
- newStamp)));
+ public boolean weakCompareAndSet(V expectedReference,
+ V newReference,
+ int expectedStamp,
+ int newStamp) {
+ return compareAndSet(expectedReference, newReference,
+ expectedStamp, newStamp);
}
/**
@@ -145,18 +142,17 @@
* @param newStamp the new value for the stamp
* @return true if successful
*/
- public boolean compareAndSet(V expectedReference,
- V newReference,
- int expectedStamp,
- int newStamp) {
- ReferenceIntegerPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- expectedStamp == current.integer &&
+ public boolean compareAndSet(V expectedReference,
+ V newReference,
+ int expectedStamp,
+ int newStamp) {
+ Pair<V> current = pair;
+ return
+ expectedReference == current.reference &&
+ expectedStamp == current.stamp &&
((newReference == current.reference &&
- newStamp == current.integer) ||
- atomicRef.compareAndSet(current,
- new ReferenceIntegerPair<V>(newReference,
- newStamp)));
+ newStamp == current.stamp) ||
+ casPair(current, Pair.of(newReference, newStamp)));
}
@@ -167,9 +163,9 @@
* @param newStamp the new value for the stamp
*/
public void set(V newReference, int newStamp) {
- ReferenceIntegerPair<V> current = atomicRef.get();
- if (newReference != current.reference || newStamp != current.integer)
- atomicRef.set(new ReferenceIntegerPair<V>(newReference, newStamp));
+ Pair<V> current = pair;
+ if (newReference != current.reference || newStamp != current.stamp)
+ this.pair = Pair.of(newReference, newStamp);
}
/**
@@ -186,11 +182,32 @@
* @return true if successful
*/
public boolean attemptStamp(V expectedReference, int newStamp) {
- ReferenceIntegerPair<V> current = atomicRef.get();
- return expectedReference == current.reference &&
- (newStamp == current.integer ||
- atomicRef.compareAndSet(current,
- new ReferenceIntegerPair<V>(expectedReference,
- newStamp)));
+ Pair<V> current = pair;
+ return
+ expectedReference == current.reference &&
+ (newStamp == current.stamp ||
+ casPair(current, Pair.of(expectedReference, newStamp)));
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long pairOffset =
+ objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
+
+ private boolean casPair(Pair<V> cmp, Pair<V> val) {
+ return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
+ }
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
}
}
--- a/jdk/src/share/classes/java/util/concurrent/locks/Condition.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/locks/Condition.java Wed Jan 12 14:40:36 2011 +0000
@@ -308,18 +308,21 @@
* condition still does not hold. Typical uses of this method take
* the following form:
*
- * <pre>
- * synchronized boolean aMethod(long timeout, TimeUnit unit) {
- * long nanosTimeout = unit.toNanos(timeout);
- * while (!conditionBeingWaitedFor) {
- * if (nanosTimeout > 0)
- * nanosTimeout = theCondition.awaitNanos(nanosTimeout);
- * else
- * return false;
+ * <pre> {@code
+ * boolean aMethod(long timeout, TimeUnit unit) {
+ * long nanos = unit.toNanos(timeout);
+ * lock.lock();
+ * try {
+ * while (!conditionBeingWaitedFor()) {
+ * if (nanos <= 0L)
+ * return false;
+ * nanos = theCondition.awaitNanos(nanos);
+ * }
+ * // ...
+ * } finally {
+ * lock.unlock();
* }
- * // ...
- * }
- * </pre>
+ * }}</pre>
*
* <p> Design note: This method requires a nanosecond argument so
* as to avoid truncation errors in reporting remaining times.
@@ -408,18 +411,21 @@
*
* <p>The return value indicates whether the deadline has elapsed,
* which can be used as follows:
- * <pre>
- * synchronized boolean aMethod(Date deadline) {
+ * <pre> {@code
+ * boolean aMethod(Date deadline) {
* boolean stillWaiting = true;
- * while (!conditionBeingWaitedFor) {
- * if (stillWaiting)
- * stillWaiting = theCondition.awaitUntil(deadline);
- * else
- * return false;
+ * lock.lock();
+ * try {
+ * while (!conditionBeingWaitedFor()) {
+ * if (!stillWaiting)
+ * return false;
+ * stillWaiting = theCondition.awaitUntil(deadline);
+ * }
+ * // ...
+ * } finally {
+ * lock.unlock();
* }
- * // ...
- * }
- * </pre>
+ * }}</pre>
*
* <p><b>Implementation Considerations</b>
*
@@ -450,6 +456,15 @@
* <p>If any threads are waiting on this condition then one
* is selected for waking up. That thread must then re-acquire the
* lock before returning from {@code await}.
+ *
+ * <p><b>Implementation Considerations</b>
+ *
+ * <p>An implementation may (and typically does) require that the
+ * current thread hold the lock associated with this {@code
+ * Condition} when this method is called. Implementations must
+ * document this precondition and any actions taken if the lock is
+ * not held. Typically, an exception such as {@link
+ * IllegalMonitorStateException} will be thrown.
*/
void signal();
@@ -459,6 +474,15 @@
* <p>If any threads are waiting on this condition then they are
* all woken up. Each thread must re-acquire the lock before it can
* return from {@code await}.
+ *
+ * <p><b>Implementation Considerations</b>
+ *
+ * <p>An implementation may (and typically does) require that the
+ * current thread hold the lock associated with this {@code
+ * Condition} when this method is called. Implementations must
+ * document this precondition and any actions taken if the lock is
+ * not held. Typically, an exception such as {@link
+ * IllegalMonitorStateException} will be thrown.
*/
void signalAll();
}
--- a/jdk/test/ProblemList.txt Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/ProblemList.txt Wed Jan 12 14:40:36 2011 +0000
@@ -734,8 +734,8 @@
# Fails on solaris-sparc -server (Set not equal to copy. 1)
java/util/EnumSet/EnumSetBash.java solaris-sparc
-# Need to be marked othervm, or changed to be samevm safe
-java/util/WeakHashMap/GCDuringIteration.java generic-all
+# Fails on solaris-sparc, see 7011857
+java/util/concurrent/Phaser/FickleRegister.java solaris-sparc
############################################################################
--- a/jdk/test/java/util/WeakHashMap/GCDuringIteration.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/WeakHashMap/GCDuringIteration.java Wed Jan 12 14:40:36 2011 +0000
@@ -33,18 +33,17 @@
import java.util.concurrent.CountDownLatch;
public class GCDuringIteration {
- static void finalizeTillYouDrop() {
- System.gc(); // Enqueue all finalizables
-
- System.runFinalization(); // Drain finalizer queue
+ private static void waitForFinalizersToRun() {
+ for (int i = 0; i < 2; i++)
+ tryWaitForFinalizersToRun();
+ }
- // There may be a straggler finalizable object still being
- // finalized by the dedicated finalizer thread. Enqueue one
- // more finalizable object, and wait for it to be finalized.
- final CountDownLatch latch = new CountDownLatch(1);
- new Object() { protected void finalize() { latch.countDown(); }};
+ private static void tryWaitForFinalizersToRun() {
System.gc();
- try { latch.await(); }
+ final CountDownLatch fin = new CountDownLatch(1);
+ new Object() { protected void finalize() { fin.countDown(); }};
+ System.gc();
+ try { fin.await(); }
catch (InterruptedException ie) { throw new Error(ie); }
}
@@ -101,7 +100,9 @@
{
int first = firstValue(map);
final Iterator<Map.Entry<Foo,Integer>> it = map.entrySet().iterator();
- foos[first] = null; finalizeTillYouDrop();
+ foos[first] = null;
+ for (int i = 0; i < 10 && map.size() != first; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first);
checkIterator(it, first-1);
equal(map.size(), first);
@@ -113,11 +114,14 @@
final Iterator<Map.Entry<Foo,Integer>> it = map.entrySet().iterator();
it.next(); // protects first entry
System.out.println(map.values());
- foos[first] = null; finalizeTillYouDrop();
+ foos[first] = null;
+ tryWaitForFinalizersToRun()
equal(map.size(), first+1);
System.out.println(map.values());
checkIterator(it, first-1);
- finalizeTillYouDrop(); // first entry no longer protected
+ // first entry no longer protected
+ for (int i = 0; i < 10 && map.size() != first; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first);
equal(firstValue(map), first-1);
}
@@ -127,12 +131,15 @@
final Iterator<Map.Entry<Foo,Integer>> it = map.entrySet().iterator();
it.next(); // protects first entry
System.out.println(map.values());
- foos[first] = foos[first-1] = null; finalizeTillYouDrop();
+ foos[first] = foos[first-1] = null;
+ tryWaitForFinalizersToRun();
equal(map.size(), first);
equal(firstValue(map), first);
System.out.println(map.values());
checkIterator(it, first-2);
- finalizeTillYouDrop(); // first entry no longer protected
+ // first entry no longer protected
+ for (int i = 0; i < 10 && map.size() != first-1; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first-1);
equal(firstValue(map), first-2);
}
@@ -143,12 +150,15 @@
it.next(); // protects first entry
it.hasNext(); // protects second entry
System.out.println(map.values());
- foos[first] = foos[first-1] = null; finalizeTillYouDrop();
+ foos[first] = foos[first-1] = null;
+ tryWaitForFinalizersToRun();
equal(firstValue(map), first);
equal(map.size(), first+1);
System.out.println(map.values());
checkIterator(it, first-1);
- finalizeTillYouDrop(); // first entry no longer protected
+ // first entry no longer protected
+ for (int i = 0; i < 10 && map.size() != first-1; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first-1);
equal(firstValue(map), first-2);
}
@@ -158,13 +168,16 @@
final Iterator<Map.Entry<Foo,Integer>> it = map.entrySet().iterator();
it.next(); // protects first entry
System.out.println(map.values());
- foos[first] = foos[first-1] = null; finalizeTillYouDrop();
+ foos[first] = foos[first-1] = null;
+ tryWaitForFinalizersToRun();
it.remove();
equal(firstValue(map), first-2);
equal(map.size(), first-1);
System.out.println(map.values());
checkIterator(it, first-2);
- finalizeTillYouDrop();
+ // first entry no longer protected
+ for (int i = 0; i < 10 && map.size() != first-1; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first-1);
equal(firstValue(map), first-2);
}
@@ -176,12 +189,14 @@
it.remove();
it.hasNext(); // protects second entry
System.out.println(map.values());
- foos[first] = foos[first-1] = null; finalizeTillYouDrop();
+ foos[first] = foos[first-1] = null;
+ tryWaitForFinalizersToRun();
equal(firstValue(map), first-1);
equal(map.size(), first);
System.out.println(map.values());
checkIterator(it, first-1);
- finalizeTillYouDrop();
+ for (int i = 0; i < 10 && map.size() != first-1; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), first-1);
equal(firstValue(map), first-2);
}
@@ -191,12 +206,13 @@
final Iterator<Map.Entry<Foo,Integer>> it = map.entrySet().iterator();
it.hasNext(); // protects first entry
Arrays.fill(foos, null);
- finalizeTillYouDrop();
+ tryWaitForFinalizersToRun();
equal(map.size(), 1);
System.out.println(map.values());
equal(it.next().getValue(), first);
check(! it.hasNext());
- finalizeTillYouDrop();
+ for (int i = 0; i < 10 && map.size() != 0; i++)
+ tryWaitForFinalizersToRun();
equal(map.size(), 0);
check(map.isEmpty());
}
--- a/jdk/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java Wed Jan 12 14:40:36 2011 +0000
@@ -34,7 +34,7 @@
/*
* @test
* @bug 4486658
- * @compile CancelledProducerConsumerLoops.java
+ * @compile -source 1.5 CancelledProducerConsumerLoops.java
* @run main/timeout=7000 CancelledProducerConsumerLoops
* @summary Checks for responsiveness of blocking queues to cancellation.
* Runs under the assumption that ITERS computations require more than
@@ -119,48 +119,24 @@
}
}
- static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
- LTQasSQ() { super(); }
- public void put(T x) {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- private final static long serialVersionUID = 42;
- }
-
- static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
- HalfSyncLTQ() { super(); }
- public void put(T x) {
- if (ThreadLocalRandom.current().nextBoolean())
- super.put(x);
- else {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- }
- private final static long serialVersionUID = 42;
- }
-
static void oneTest(int pairs, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
+ oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
- /* TODO: unbounded queue implementations are prone to OOME
+ /* PriorityBlockingQueue is unbounded
oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4);
- oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
- oneRun(new LTQasSQ<Integer>(), pairs, iters);
- oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
*/
}
- static abstract class Stage implements Callable<Integer> {
+ abstract static class Stage implements Callable<Integer> {
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
final int iters;
- Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
+ Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
--- a/jdk/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java Wed Jan 12 14:40:36 2011 +0000
@@ -34,7 +34,7 @@
/*
* @test
* @bug 4486658
- * @compile MultipleProducersSingleConsumerLoops.java
+ * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java
* @run main/timeout=3600 MultipleProducersSingleConsumerLoops
* @summary multiple producers and single consumer using blocking queues
*/
@@ -87,35 +87,11 @@
throw new Error();
}
- static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
- LTQasSQ() { super(); }
- public void put(T x) {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- private final static long serialVersionUID = 42;
- }
-
- static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
- HalfSyncLTQ() { super(); }
- public void put(T x) {
- if (ThreadLocalRandom.current().nextBoolean())
- super.put(x);
- else {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- }
- private final static long serialVersionUID = 42;
- }
-
static void oneTest(int producers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
- oneRun(new LTQasSQ<Integer>(), producers, iters);
- oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
// Don't run PBQ since can legitimately run out of memory
// if (print)
@@ -129,11 +105,11 @@
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
}
- static abstract class Stage implements Runnable {
+ abstract static class Stage implements Runnable {
final int iters;
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
- Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
+ Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
--- a/jdk/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java Wed Jan 12 14:40:36 2011 +0000
@@ -34,7 +34,7 @@
/*
* @test
* @bug 4486658
- * @compile ProducerConsumerLoops.java
+ * @compile -source 1.5 ProducerConsumerLoops.java
* @run main/timeout=3600 ProducerConsumerLoops
* @summary multiple producers and consumers using blocking queues
*/
@@ -87,35 +87,11 @@
throw new Error();
}
- static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
- LTQasSQ() { super(); }
- public void put(T x) {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- private final static long serialVersionUID = 42;
- }
-
- static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
- HalfSyncLTQ() { super(); }
- public void put(T x) {
- if (ThreadLocalRandom.current().nextBoolean())
- super.put(x);
- else {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- }
- private final static long serialVersionUID = 42;
- }
-
static void oneTest(int pairs, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
- oneRun(new LTQasSQ<Integer>(), pairs, iters);
- oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
oneRun(new SynchronousQueue<Integer>(), pairs, iters);
@@ -126,11 +102,11 @@
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters);
}
- static abstract class Stage implements Runnable {
+ abstract static class Stage implements Runnable {
final int iters;
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
- Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
+ Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
--- a/jdk/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java Wed Jan 12 14:40:36 2011 +0000
@@ -34,7 +34,7 @@
/*
* @test
* @bug 4486658
- * @compile SingleProducerMultipleConsumerLoops.java
+ * @compile -source 1.5 SingleProducerMultipleConsumerLoops.java
* @run main/timeout=600 SingleProducerMultipleConsumerLoops
* @summary check ordering for blocking queues with 1 producer and multiple consumers
*/
@@ -73,35 +73,11 @@
throw new Error();
}
- static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
- LTQasSQ() { super(); }
- public void put(T x) {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- private final static long serialVersionUID = 42;
- }
-
- static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
- HalfSyncLTQ() { super(); }
- public void put(T x) {
- if (ThreadLocalRandom.current().nextBoolean())
- super.put(x);
- else {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- }
- private final static long serialVersionUID = 42;
- }
-
static void oneTest(int consumers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
- oneRun(new LTQasSQ<Integer>(), consumers, iters);
- oneRun(new HalfSyncLTQ<Integer>(), consumers, iters);
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
oneRun(new SynchronousQueue<Integer>(), consumers, iters);
if (print)
@@ -110,12 +86,12 @@
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters);
}
- static abstract class Stage implements Runnable {
+ abstract static class Stage implements Runnable {
final int iters;
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
volatile int result;
- Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
+ Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java Wed Jan 12 14:40:36 2011 +0000
@@ -53,7 +53,9 @@
test(new LinkedTransferQueue());
// Other concurrent queues (e.g. ArrayBlockingQueue) do not
// currently have weakly consistent iterators.
- // test(new ArrayBlockingQueue(20));
+ // As of 2010-09, ArrayBlockingQueue passes this test, but
+ // does not fully implement weak consistency.
+ test(new ArrayBlockingQueue(20));
}
void test(Queue q) {
--- a/jdk/test/java/util/concurrent/Executors/AutoShutdown.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/Executors/AutoShutdown.java Wed Jan 12 14:40:36 2011 +0000
@@ -24,6 +24,7 @@
/*
* @test
* @bug 6399443
+ * @run main/othervm AutoShutdown
* @summary Check for auto-shutdown and gc of singleThreadExecutors
* @author Martin Buchholz
*/
--- a/jdk/test/java/util/concurrent/Phaser/Basic.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/Phaser/Basic.java Wed Jan 12 14:40:36 2011 +0000
@@ -52,15 +52,16 @@
check(phaser.isTerminated());
int unarriverParties = phaser.getUnarrivedParties();
int registeredParties = phaser.getRegisteredParties();
- equal(phaser.arrive(), -1);
- equal(phaser.arriveAndDeregister(), -1);
- equal(phaser.arriveAndAwaitAdvance(), -1);
- equal(phaser.bulkRegister(10), -1);
- equal(phaser.getPhase(), -1);
- equal(phaser.register(), -1);
+ int phase = phaser.getPhase();
+ check(phase < 0);
+ equal(phase, phaser.arrive());
+ equal(phase, phaser.arriveAndDeregister());
+ equal(phase, phaser.arriveAndAwaitAdvance());
+ equal(phase, phaser.bulkRegister(10));
+ equal(phase, phaser.register());
try {
- equal(phaser.awaitAdvanceInterruptibly(0), -1);
- equal(phaser.awaitAdvanceInterruptibly(0, 10, SECONDS), -1);
+ equal(phase, phaser.awaitAdvanceInterruptibly(0));
+ equal(phase, phaser.awaitAdvanceInterruptibly(0, 10, SECONDS));
} catch (Exception ie) {
unexpected(ie);
}
@@ -94,10 +95,9 @@
}
int phase = atTheStartingGate.getPhase();
equal(phase, atTheStartingGate.arrive());
- int AwaitPhase = atTheStartingGate.awaitAdvanceInterruptibly(phase,
- 10,
- SECONDS);
- if (expectNextPhase) check(AwaitPhase == (phase + 1));
+ int awaitPhase = atTheStartingGate.awaitAdvanceInterruptibly
+ (phase, 10, SECONDS);
+ if (expectNextPhase) check(awaitPhase == (phase + 1));
pass();
} catch (Throwable t) {
@@ -271,18 +271,19 @@
// Phaser is terminated while threads are waiting
//----------------------------------------------------------------
try {
- Phaser phaser = new Phaser(3);
- Iterator<Awaiter> awaiters = awaiterIterator(phaser);
for (int i = 0; i < 4; i++) {
+ Phaser phaser = new Phaser(3);
+ Iterator<Awaiter> awaiters = awaiterIterator(phaser);
Arriver a1 = awaiters.next(); a1.start();
Arriver a2 = awaiters.next(); a2.start();
toTheStartingGate();
while (phaser.getArrivedParties() < 2) Thread.yield();
+ equal(0, phaser.getPhase());
phaser.forceTermination();
a1.join();
a2.join();
- check(a1.phase == -1);
- check(a2.phase == -1);
+ equal(0 + Integer.MIN_VALUE, a1.phase);
+ equal(0 + Integer.MIN_VALUE, a2.phase);
int arrivedParties = phaser.getArrivedParties();
checkTerminated(phaser);
equal(phaser.getArrivedParties(), arrivedParties);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/Phaser/FickleRegister.java Wed Jan 12 14:40:36 2011 +0000
@@ -0,0 +1,150 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @summary stress test for register/arriveAndDeregister
+ * @run main FickleRegister 300
+ */
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+public class FickleRegister {
+ final AtomicLong count = new AtomicLong(0);
+ final long testDurationMillisDefault = 10L * 1000L;
+ final long testDurationMillis;
+ final long quittingTimeNanos;
+ final int chunkSize = 1000;
+
+ FickleRegister(String[] args) {
+ testDurationMillis = (args.length > 0) ?
+ Long.valueOf(args[0]) : testDurationMillisDefault;
+ quittingTimeNanos = System.nanoTime() +
+ testDurationMillis * 1000L * 1000L;
+ }
+
+ class Runner extends CheckedRunnable {
+ final Phaser p;
+ Runner(Phaser phaser) { p = phaser; }
+ public void realRun() {
+ int prevPhase = -1;
+ for (int k = 1;; k++) {
+ for (int i = 0; i < chunkSize; i++) {
+ int phase = p.register();
+ if (phase < 0) break;
+ check(phase > prevPhase);
+ prevPhase = phase;
+ equal(phase, p.arriveAndDeregister());
+ check(phase < p.awaitAdvance(phase));
+ }
+ if (System.nanoTime() - quittingTimeNanos > 0) {
+ count.getAndAdd(k * chunkSize);
+ break;
+ }
+ }
+ }
+ }
+
+ void test(String[] args) throws Throwable {
+ final Phaser parent = new Phaser() {
+ protected boolean onAdvance(int phase, int parties) {
+ return false;
+ }
+ };
+
+ final Phaser child1 = new Phaser(parent);
+ final Phaser child2 = new Phaser(parent);
+ final Phaser subchild1 = new Phaser(child1);
+ final Phaser subchild2 = new Phaser(child2);
+ final Phaser[] phasers = {
+ parent, child1, child2, subchild1, subchild2
+ };
+
+ int reps = 4;
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+ for (int j = 0; j < reps; ++j) {
+ threads.add(new Thread(new Runner(subchild1)));
+ threads.add(new Thread(new Runner(child1)));
+ threads.add(new Thread(new Runner(parent)));
+ threads.add(new Thread(new Runner(child2)));
+ threads.add(new Thread(new Runner(subchild2)));
+ }
+
+ for (Thread thread : threads)
+ thread.start();
+
+ for (Thread thread : threads)
+ thread.join();
+
+ System.out.println("Parent: " + parent);
+ System.out.println("Child1: " + child1);
+ System.out.println("Child2: " + child2);
+ System.out.println("Subchild1: " + subchild1);
+ System.out.println("Subchild2: " + subchild2);
+ System.out.println("Iterations:" + count.get());
+
+ for (Phaser phaser : phasers) {
+ check(phaser.getPhase() > 0);
+ equal(0, phaser.getRegisteredParties());
+ equal(0, phaser.getUnarrivedParties());
+ equal(parent.getPhase(), phaser.getPhase());
+ }
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {System.err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ new FickleRegister(args).instanceMain(args);}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+
+ abstract class CheckedRunnable implements Runnable {
+ protected abstract void realRun() throws Throwable;
+
+ public final void run() {
+ try {realRun();} catch (Throwable t) {unexpected(t);}
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/Phaser/PhaseOverflow.java Wed Jan 12 14:40:36 2011 +0000
@@ -0,0 +1,158 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Martin Buchholz and Doug Lea with assistance from
+ * members of JCP JSR-166 Expert Group and released to the public
+ * domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @summary Test Phaser phase integer overflow behavior
+ */
+
+import java.util.concurrent.Phaser;
+import java.lang.reflect.Field;
+
+public class PhaseOverflow {
+ Field stateField;
+
+ void checkState(Phaser phaser,
+ int phase, int parties, int unarrived) {
+ equal(phase, phaser.getPhase());
+ equal(parties, phaser.getRegisteredParties());
+ equal(unarrived, phaser.getUnarrivedParties());
+ }
+
+ void test(String[] args) throws Throwable {
+ stateField = Phaser.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+ testLeaf();
+ testTiered();
+ }
+
+ void testLeaf() throws Throwable {
+ Phaser phaser = new Phaser();
+ // this is extremely dependent on internal representation
+ stateField.setLong(phaser, ((Integer.MAX_VALUE - 1L) << 32) | 1L);
+ checkState(phaser, Integer.MAX_VALUE - 1, 0, 0);
+ phaser.register();
+ checkState(phaser, Integer.MAX_VALUE - 1, 1, 1);
+ phaser.arrive();
+ checkState(phaser, Integer.MAX_VALUE, 1, 1);
+ phaser.arrive();
+ checkState(phaser, 0, 1, 1);
+ phaser.arrive();
+ checkState(phaser, 1, 1, 1);
+ }
+
+ int phaseInc(int phase) { return (phase + 1) & Integer.MAX_VALUE; }
+
+ void testTiered() throws Throwable {
+ Phaser root = new Phaser();
+ // this is extremely dependent on internal representation
+ stateField.setLong(root, ((Integer.MAX_VALUE - 1L) << 32) | 1L);
+ checkState(root, Integer.MAX_VALUE - 1, 0, 0);
+ Phaser p1 = new Phaser(root, 1);
+ checkState(root, Integer.MAX_VALUE - 1, 1, 1);
+ checkState(p1, Integer.MAX_VALUE - 1, 1, 1);
+ Phaser p2 = new Phaser(root, 2);
+ checkState(root, Integer.MAX_VALUE - 1, 2, 2);
+ checkState(p2, Integer.MAX_VALUE - 1, 2, 2);
+ int ph = Integer.MAX_VALUE - 1;
+ for (int k = 0; k < 5; k++) {
+ checkState(root, ph, 2, 2);
+ checkState(p1, ph, 1, 1);
+ checkState(p2, ph, 2, 2);
+ p1.arrive();
+ checkState(root, ph, 2, 1);
+ checkState(p1, ph, 1, 0);
+ checkState(p2, ph, 2, 2);
+ p2.arrive();
+ checkState(root, ph, 2, 1);
+ checkState(p1, ph, 1, 0);
+ checkState(p2, ph, 2, 1);
+ p2.arrive();
+ ph = phaseInc(ph);
+ checkState(root, ph, 2, 2);
+ checkState(p1, ph, 1, 1);
+ checkState(p2, ph, 2, 2);
+ }
+ equal(3, ph);
+ }
+
+ void xtestTiered() throws Throwable {
+ Phaser root = new Phaser();
+ stateField.setLong(root, ((Integer.MAX_VALUE - 1L) << 32) | 1L);
+ checkState(root, Integer.MAX_VALUE - 1, 0, 0);
+ Phaser p1 = new Phaser(root, 1);
+ checkState(root, Integer.MAX_VALUE - 1, 1, 1);
+ checkState(p1, Integer.MAX_VALUE - 1, 1, 1);
+ Phaser p2 = new Phaser(root, 2);
+ checkState(root, Integer.MAX_VALUE - 1, 2, 2);
+ checkState(p2, Integer.MAX_VALUE - 1, 2, 2);
+ int ph = Integer.MAX_VALUE - 1;
+ for (int k = 0; k < 5; k++) {
+ checkState(root, ph, 2, 2);
+ checkState(p1, ph, 1, 1);
+ checkState(p2, ph, 2, 2);
+ p1.arrive();
+ checkState(root, ph, 2, 1);
+ checkState(p1, ph, 1, 0);
+ checkState(p2, ph, 2, 2);
+ p2.arrive();
+ checkState(root, ph, 2, 1);
+ checkState(p1, ph, 1, 0);
+ checkState(p2, ph, 2, 1);
+ p2.arrive();
+ ph = phaseInc(ph);
+ checkState(root, ph, 2, 2);
+ checkState(p1, ph, 1, 1);
+ checkState(p2, ph, 2, 2);
+ }
+ equal(3, ph);
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {System.err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ new PhaseOverflow().instanceMain(args);}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/Phaser/TieredArriveLoops.java Wed Jan 12 14:40:36 2011 +0000
@@ -0,0 +1,117 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @summary stress test for arrivals in a tiered phaser
+ * @run main TieredArriveLoops 300
+ */
+import java.util.*;
+import java.util.concurrent.*;
+
+public class TieredArriveLoops {
+ final long testDurationMillisDefault = 10L * 1000L;
+ final long testDurationMillis;
+ final long quittingTimeNanos;
+
+ TieredArriveLoops(String[] args) {
+ testDurationMillis = (args.length > 0) ?
+ Long.valueOf(args[0]) : testDurationMillisDefault;
+ quittingTimeNanos = System.nanoTime() +
+ testDurationMillis * 1000L * 1000L;
+ }
+
+ Runnable runner(final Phaser p) {
+ return new CheckedRunnable() { public void realRun() {
+ int prevPhase = p.register();
+ while (!p.isTerminated()) {
+ int phase = p.awaitAdvance(p.arrive());
+ if (phase < 0)
+ return;
+ equal(phase, (prevPhase + 1) & Integer.MAX_VALUE);
+ int ph = p.getPhase();
+ check(ph < 0 || ph == phase);
+ prevPhase = phase;
+ }
+ }};
+ }
+
+ void test(String[] args) throws Throwable {
+ final Phaser parent = new Phaser();
+ final Phaser child1 = new Phaser(parent);
+ final Phaser child2 = new Phaser(parent);
+
+ Thread t1 = new Thread(runner(child1));
+ Thread t2 = new Thread(runner(child2));
+ t1.start();
+ t2.start();
+
+ for (int prevPhase = 0, phase; ; prevPhase = phase) {
+ phase = child2.getPhase();
+ check(phase >= prevPhase);
+ if (System.nanoTime() - quittingTimeNanos > 0) {
+ System.err.printf("phase=%d%n", phase);
+ child1.forceTermination();
+ break;
+ }
+ }
+
+ t1.join();
+ t2.join();
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {System.err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ new TieredArriveLoops(args).instanceMain(args);}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+
+ abstract class CheckedRunnable implements Runnable {
+ protected abstract void realRun() throws Throwable;
+
+ public final void run() {
+ try {realRun();} catch (Throwable t) {unexpected(t);}
+ }
+ }
+}
--- a/jdk/test/java/util/concurrent/ThreadPoolExecutor/CoreThreadTimeOut.java Tue Jan 11 13:42:34 2011 -0800
+++ b/jdk/test/java/util/concurrent/ThreadPoolExecutor/CoreThreadTimeOut.java Wed Jan 12 14:40:36 2011 +0000
@@ -31,47 +31,79 @@
import java.util.concurrent.*;
public class CoreThreadTimeOut {
- static volatile int passed = 0, failed = 0;
- static void pass() { passed++; }
- static void fail() { failed++; Thread.dumpStack(); }
- static void unexpected(Throwable t) { failed++; t.printStackTrace(); }
- static void check(boolean cond) { if (cond) pass(); else fail(); }
- static void equal(Object x, Object y) {
- if (x == null ? y == null : x.equals(y)) pass();
- else {System.out.println(x + " not equal to " + y); fail(); }}
+
+ static class IdentifiableThreadFactory implements ThreadFactory {
+ static ThreadFactory defaultThreadFactory
+ = Executors.defaultThreadFactory();
- static int countExecutorThreads() {
+ public Thread newThread(Runnable r) {
+ Thread t = defaultThreadFactory.newThread(r);
+ t.setName("CoreThreadTimeOut-" + t.getName());
+ return t;
+ }
+ }
+
+ int countExecutorThreads() {
Thread[] threads = new Thread[Thread.activeCount()+100];
Thread.enumerate(threads);
int count = 0;
for (Thread t : threads)
- if (t != null && t.getName().matches("pool-[0-9]+-thread-[0-9]+"))
+ if (t != null &&
+ t.getName().matches
+ ("CoreThreadTimeOut-pool-[0-9]+-thread-[0-9]+"))
count++;
return count;
}
- public static void main(String[] args) throws Throwable {
+ long millisElapsedSince(long t0) {
+ return (System.nanoTime() - t0) / (1000L * 1000L);
+ }
+
+ void test(String[] args) throws Throwable {
final int threadCount = 10;
+ final int timeoutMillis = 30;
BlockingQueue<Runnable> q
= new ArrayBlockingQueue<Runnable>(2*threadCount);
ThreadPoolExecutor tpe
= new ThreadPoolExecutor(threadCount, threadCount,
- 30, TimeUnit.MILLISECONDS,
- q);
+ timeoutMillis, TimeUnit.MILLISECONDS,
+ q, new IdentifiableThreadFactory());
equal(tpe.getCorePoolSize(), threadCount);
check(! tpe.allowsCoreThreadTimeOut());
tpe.allowCoreThreadTimeOut(true);
check(tpe.allowsCoreThreadTimeOut());
equal(countExecutorThreads(), 0);
+ long t0 = System.nanoTime();
for (int i = 0; i < threadCount; i++)
tpe.submit(new Runnable() { public void run() {}});
- equal(countExecutorThreads(), threadCount);
- Thread.sleep(500);
+ int count = countExecutorThreads();
+ if (millisElapsedSince(t0) < timeoutMillis)
+ equal(count, threadCount);
+ while (countExecutorThreads() > 0 &&
+ millisElapsedSince(t0) < 10 * 1000);
equal(countExecutorThreads(), 0);
tpe.shutdown();
check(tpe.allowsCoreThreadTimeOut());
+ check(tpe.awaitTermination(10, TimeUnit.SECONDS));
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new Exception("Some tests failed");
}
+
+ //--------------------- Infrastructure ---------------------------
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {System.err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ new CoreThreadTimeOut().instanceMain(args);}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
}