--- a/jdk/make/java/java/FILES_java.gmk Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/make/java/java/FILES_java.gmk Mon Nov 02 22:23:50 2009 -0800
@@ -286,11 +286,18 @@
java/util/concurrent/ExecutorService.java \
java/util/concurrent/ExecutorCompletionService.java \
java/util/concurrent/Executors.java \
+ java/util/concurrent/ForkJoinPool.java \
+ java/util/concurrent/ForkJoinTask.java \
+ java/util/concurrent/ForkJoinWorkerThread.java \
java/util/concurrent/Future.java \
java/util/concurrent/FutureTask.java \
java/util/concurrent/LinkedBlockingDeque.java \
java/util/concurrent/LinkedBlockingQueue.java \
+ java/util/concurrent/LinkedTransferQueue.java \
+ java/util/concurrent/Phaser.java \
java/util/concurrent/PriorityBlockingQueue.java \
+ java/util/concurrent/RecursiveAction.java \
+ java/util/concurrent/RecursiveTask.java \
java/util/concurrent/RejectedExecutionException.java \
java/util/concurrent/RejectedExecutionHandler.java \
java/util/concurrent/RunnableFuture.java \
@@ -301,9 +308,11 @@
java/util/concurrent/Semaphore.java \
java/util/concurrent/SynchronousQueue.java \
java/util/concurrent/ThreadFactory.java \
+ java/util/concurrent/ThreadLocalRandom.java \
java/util/concurrent/ThreadPoolExecutor.java \
java/util/concurrent/TimeUnit.java \
java/util/concurrent/TimeoutException.java \
+ java/util/concurrent/TransferQueue.java \
java/util/concurrent/atomic/AtomicBoolean.java \
java/util/concurrent/atomic/AtomicInteger.java \
java/util/concurrent/atomic/AtomicIntegerArray.java \
--- a/jdk/src/share/classes/java/util/AbstractList.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/AbstractList.java Mon Nov 02 22:23:50 2009 -0800
@@ -256,9 +256,8 @@
public boolean addAll(int index, Collection<? extends E> c) {
rangeCheckForAdd(index);
boolean modified = false;
- Iterator<? extends E> e = c.iterator();
- while (e.hasNext()) {
- add(index++, e.next());
+ for (E e : c) {
+ add(index++, e);
modified = true;
}
return modified;
--- a/jdk/src/share/classes/java/util/AbstractQueue.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/AbstractQueue.java Mon Nov 02 22:23:50 2009 -0800
@@ -183,11 +183,9 @@
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
- Iterator<? extends E> e = c.iterator();
- while (e.hasNext()) {
- if (add(e.next()))
+ for (E e : c)
+ if (add(e))
modified = true;
- }
return modified;
}
--- a/jdk/src/share/classes/java/util/HashMap.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/HashMap.java Mon Nov 02 22:23:50 2009 -0800
@@ -448,10 +448,8 @@
}
private void putAllForCreate(Map<? extends K, ? extends V> m) {
- for (Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator(); i.hasNext(); ) {
- Map.Entry<? extends K, ? extends V> e = i.next();
+ for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putForCreate(e.getKey(), e.getValue());
- }
}
/**
@@ -536,10 +534,8 @@
resize(newCapacity);
}
- for (Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator(); i.hasNext(); ) {
- Map.Entry<? extends K, ? extends V> e = i.next();
+ for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
put(e.getKey(), e.getValue());
- }
}
/**
--- a/jdk/src/share/classes/java/util/HashSet.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/HashSet.java Mon Nov 02 22:23:50 2009 -0800
@@ -280,8 +280,8 @@
s.writeInt(map.size());
// Write out all elements in the proper order.
- for (Iterator i=map.keySet().iterator(); i.hasNext(); )
- s.writeObject(i.next());
+ for (E e : map.keySet())
+ s.writeObject(e);
}
/**
--- a/jdk/src/share/classes/java/util/Random.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/Random.java Mon Nov 02 22:23:50 2009 -0800
@@ -50,6 +50,18 @@
* <p>
* Many applications will find the method {@link Math#random} simpler to use.
*
+ * <p>Instances of {@code java.util.Random} are threadsafe.
+ * However, the concurrent use of the same {@code java.util.Random}
+ * instance across threads may encounter contention and consequent
+ * poor performance. Consider instead using
+ * {@link java.util.concurrent.ThreadLocalRandom} in multithreaded
+ * designs.
+ *
+ * <p>Instances of {@code java.util.Random} are not cryptographically
+ * secure. Consider instead using {@link java.security.SecureRandom} to
+ * get a cryptographically secure pseudo-random number generator for use
+ * by security-sensitive applications.
+ *
* @author Frank Yellin
* @since 1.0
*/
--- a/jdk/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java Mon Nov 02 22:23:50 2009 -0800
@@ -218,8 +218,8 @@
if (capacity < c.size())
throw new IllegalArgumentException();
- for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
- add(it.next());
+ for (E e : c)
+ add(e);
}
/**
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Mon Nov 02 22:23:50 2009 -0800
@@ -250,8 +250,8 @@
* of its elements are null
*/
public ConcurrentLinkedQueue(Collection<? extends E> c) {
- for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
- add(it.next());
+ for (E e : c)
+ add(e);
}
// Have to override just to update the javadoc
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java Mon Nov 02 22:23:50 2009 -0800
@@ -895,7 +895,7 @@
if (n != null) {
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
- break;;
+ break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
--- a/jdk/src/share/classes/java/util/concurrent/CountDownLatch.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/CountDownLatch.java Mon Nov 02 22:23:50 2009 -0800
@@ -148,7 +148,8 @@
*
* </pre>
*
- * <p>Memory consistency effects: Actions in a thread prior to calling
+ * <p>Memory consistency effects: Until the count reaches
+ * zero, actions in a thread prior to calling
* {@code countDown()}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions following a successful return from a corresponding
--- a/jdk/src/share/classes/java/util/concurrent/ExecutorService.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ExecutorService.java Mon Nov 02 22:23:50 2009 -0800
@@ -332,8 +332,8 @@
* @param tasks the collection of tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
- * @throws NullPointerException if tasks or any of its elements
- * are <tt>null</tt>
+ * @throws NullPointerException if tasks or any element task
+ * subject to execution is <tt>null</tt>
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
@@ -356,8 +356,8 @@
* @param unit the time unit of the timeout argument
* @return the result returned by one of the tasks.
* @throws InterruptedException if interrupted while waiting
- * @throws NullPointerException if tasks, any of its elements, or
- * unit are <tt>null</tt>
+ * @throws NullPointerException if tasks, or unit, or any element
+ * task subject to execution is <tt>null</tt>
* @throws TimeoutException if the given timeout elapses before
* any task successfully completes
* @throws ExecutionException if no task successfully completes
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,1988 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An {@link ExecutorService} for running {@link ForkJoinTask}s.
+ * A {@code ForkJoinPool} provides the entry point for submissions
+ * from non-{@code ForkJoinTask}s, as well as management and
+ * monitoring operations.
+ *
+ * <p>A {@code ForkJoinPool} differs from other kinds of {@link
+ * ExecutorService} mainly by virtue of employing
+ * <em>work-stealing</em>: all threads in the pool attempt to find and
+ * execute subtasks created by other active tasks (eventually blocking
+ * waiting for work if none exist). This enables efficient processing
+ * when most tasks spawn other subtasks (as do most {@code
+ * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
+ * execution of some plain {@code Runnable}- or {@code Callable}-
+ * based activities along with {@code ForkJoinTask}s. When setting
+ * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
+ * also be appropriate for use with fine-grained tasks of any form
+ * that are never joined. Otherwise, other {@code ExecutorService}
+ * implementations are typically more appropriate choices.
+ *
+ * <p>A {@code ForkJoinPool} is constructed with a given target
+ * parallelism level; by default, equal to the number of available
+ * processors. Unless configured otherwise via {@link
+ * #setMaintainsParallelism}, the pool attempts to maintain this
+ * number of active (or available) threads by dynamically adding,
+ * suspending, or resuming internal worker threads, even if some tasks
+ * are stalled waiting to join others. However, no such adjustments
+ * are performed in the face of blocked IO or other unmanaged
+ * synchronization. The nested {@link ManagedBlocker} interface
+ * enables extension of the kinds of synchronization accommodated.
+ * The target parallelism level may also be changed dynamically
+ * ({@link #setParallelism}). The total number of threads may be
+ * limited using method {@link #setMaximumPoolSize}, in which case it
+ * may become possible for the activities of a pool to stall due to
+ * the lack of available threads to process new tasks.
+ *
+ * <p>In addition to execution and lifecycle control methods, this
+ * class provides status check methods (for example
+ * {@link #getStealCount}) that are intended to aid in developing,
+ * tuning, and monitoring fork/join applications. Also, method
+ * {@link #toString} returns indications of pool state in a
+ * convenient form for informal monitoring.
+ *
+ * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
+ * used for all parallel task execution in a program or subsystem.
+ * Otherwise, use would not usually outweigh the construction and
+ * bookkeeping overhead of creating a large set of threads. For
+ * example, a common pool could be used for the {@code SortTasks}
+ * illustrated in {@link RecursiveAction}. Because {@code
+ * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
+ * daemon} mode, there is typically no need to explicitly {@link
+ * #shutdown} such a pool upon program exit.
+ *
+ * <pre>
+ * static final ForkJoinPool mainPool = new ForkJoinPool();
+ * ...
+ * public void sort(long[] array) {
+ * mainPool.invoke(new SortTask(array, 0, array.length));
+ * }
+ * </pre>
+ *
+ * <p><b>Implementation notes</b>: This implementation restricts the
+ * maximum number of running threads to 32767. Attempts to create
+ * pools with greater than the maximum number result in
+ * {@code IllegalArgumentException}.
+ *
+ * <p>This implementation rejects submitted tasks (that is, by throwing
+ * {@link RejectedExecutionException}) only when the pool is shut down.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public class ForkJoinPool extends AbstractExecutorService {
+
+ /*
+ * See the extended comments interspersed below for design,
+ * rationale, and walkthroughs.
+ */
+
+ /** Mask for packing and unpacking shorts */
+ private static final int shortMask = 0xffff;
+
+ /** Max pool size -- must be a power of two minus 1 */
+ private static final int MAX_THREADS = 0x7FFF;
+
+ /**
+ * Factory for creating new {@link ForkJoinWorkerThread}s.
+ * A {@code ForkJoinWorkerThreadFactory} must be defined and used
+ * for {@code ForkJoinWorkerThread} subclasses that extend base
+ * functionality or initialize threads with different contexts.
+ */
+ public static interface ForkJoinWorkerThreadFactory {
+ /**
+ * Returns a new worker thread operating in the given pool.
+ *
+ * @param pool the pool this thread works in
+ * @throws NullPointerException if the pool is null
+ */
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool);
+ }
+
+ /**
+ * Default ForkJoinWorkerThreadFactory implementation; creates a
+ * new ForkJoinWorkerThread.
+ */
+ static class DefaultForkJoinWorkerThreadFactory
+ implements ForkJoinWorkerThreadFactory {
+ public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ try {
+ return new ForkJoinWorkerThread(pool);
+ } catch (OutOfMemoryError oom) {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Creates a new ForkJoinWorkerThread. This factory is used unless
+ * overridden in ForkJoinPool constructors.
+ */
+ public static final ForkJoinWorkerThreadFactory
+ defaultForkJoinWorkerThreadFactory =
+ new DefaultForkJoinWorkerThreadFactory();
+
+ /**
+ * Permission required for callers of methods that may start or
+ * kill threads.
+ */
+ private static final RuntimePermission modifyThreadPermission =
+ new RuntimePermission("modifyThread");
+
+ /**
+ * If there is a security manager, makes sure caller has
+ * permission to modify threads.
+ */
+ private static void checkPermission() {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ security.checkPermission(modifyThreadPermission);
+ }
+
+ /**
+ * Generator for assigning sequence numbers as pool names.
+ */
+ private static final AtomicInteger poolNumberGenerator =
+ new AtomicInteger();
+
+ /**
+ * Array holding all worker threads in the pool. Initialized upon
+ * first use. Array size must be a power of two. Updates and
+ * replacements are protected by workerLock, but it is always kept
+ * in a consistent enough state to be randomly accessed without
+ * locking by workers performing work-stealing.
+ */
+ volatile ForkJoinWorkerThread[] workers;
+
+ /**
+ * Lock protecting access to workers.
+ */
+ private final ReentrantLock workerLock;
+
+ /**
+ * Condition for awaitTermination.
+ */
+ private final Condition termination;
+
+ /**
+ * The uncaught exception handler used when any worker
+ * abruptly terminates
+ */
+ private Thread.UncaughtExceptionHandler ueh;
+
+ /**
+ * Creation factory for worker threads.
+ */
+ private final ForkJoinWorkerThreadFactory factory;
+
+ /**
+ * Head of stack of threads that were created to maintain
+ * parallelism when other threads blocked, but have since
+ * suspended when the parallelism level rose.
+ */
+ private volatile WaitQueueNode spareStack;
+
+ /**
+ * Sum of per-thread steal counts, updated only when threads are
+ * idle or terminating.
+ */
+ private final AtomicLong stealCount;
+
+ /**
+ * Queue for external submissions.
+ */
+ private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
+
+ /**
+ * Head of Treiber stack for barrier sync. See below for explanation.
+ */
+ private volatile WaitQueueNode syncStack;
+
+ /**
+ * The count for event barrier
+ */
+ private volatile long eventCount;
+
+ /**
+ * Pool number, just for assigning useful names to worker threads
+ */
+ private final int poolNumber;
+
+ /**
+ * The maximum allowed pool size
+ */
+ private volatile int maxPoolSize;
+
+ /**
+ * The desired parallelism level, updated only under workerLock.
+ */
+ private volatile int parallelism;
+
+ /**
+ * True if use local fifo, not default lifo, for local polling
+ */
+ private volatile boolean locallyFifo;
+
+ /**
+ * Holds number of total (i.e., created and not yet terminated)
+ * and running (i.e., not blocked on joins or other managed sync)
+ * threads, packed into one int to ensure consistent snapshot when
+ * making decisions about creating and suspending spare
+ * threads. Updated only by CAS. Note: CASes in
+ * updateRunningCount and preJoin assume that running active count
+ * is in low word, so need to be modified if this changes.
+ */
+ private volatile int workerCounts;
+
+ private static int totalCountOf(int s) { return s >>> 16; }
+ private static int runningCountOf(int s) { return s & shortMask; }
+ private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
+
+ /**
+ * Adds delta (which may be negative) to running count. This must
+ * be called before (with negative arg) and after (with positive)
+ * any managed synchronization (i.e., mainly, joins).
+ *
+ * @param delta the number to add
+ */
+ final void updateRunningCount(int delta) {
+ int s;
+ do {} while (!casWorkerCounts(s = workerCounts, s + delta));
+ }
+
+ /**
+ * Adds delta (which may be negative) to both total and running
+ * count. This must be called upon creation and termination of
+ * worker threads.
+ *
+ * @param delta the number to add
+ */
+ private void updateWorkerCount(int delta) {
+ int d = delta + (delta << 16); // add to both lo and hi parts
+ int s;
+ do {} while (!casWorkerCounts(s = workerCounts, s + d));
+ }
+
+ /**
+ * Lifecycle control. High word contains runState, low word
+ * contains the number of workers that are (probably) executing
+ * tasks. This value is atomically incremented before a worker
+ * gets a task to run, and decremented when worker has no tasks
+ * and cannot find any. These two fields are bundled together to
+ * support correct termination triggering. Note: activeCount
+ * CAS'es cheat by assuming active count is in low word, so need
+ * to be modified if this changes
+ */
+ private volatile int runControl;
+
+ // RunState values. Order among values matters
+ private static final int RUNNING = 0;
+ private static final int SHUTDOWN = 1;
+ private static final int TERMINATING = 2;
+ private static final int TERMINATED = 3;
+
+ private static int runStateOf(int c) { return c >>> 16; }
+ private static int activeCountOf(int c) { return c & shortMask; }
+ private static int runControlFor(int r, int a) { return (r << 16) + a; }
+
+ /**
+ * Tries incrementing active count; fails on contention.
+ * Called by workers before/during executing tasks.
+ *
+ * @return true on success
+ */
+ final boolean tryIncrementActiveCount() {
+ int c = runControl;
+ return casRunControl(c, c+1);
+ }
+
+ /**
+ * Tries decrementing active count; fails on contention.
+ * Possibly triggers termination on success.
+ * Called by workers when they can't find tasks.
+ *
+ * @return true on success
+ */
+ final boolean tryDecrementActiveCount() {
+ int c = runControl;
+ int nextc = c - 1;
+ if (!casRunControl(c, nextc))
+ return false;
+ if (canTerminateOnShutdown(nextc))
+ terminateOnShutdown();
+ return true;
+ }
+
+ /**
+ * Returns {@code true} if argument represents zero active count
+ * and nonzero runstate, which is the triggering condition for
+ * terminating on shutdown.
+ */
+ private static boolean canTerminateOnShutdown(int c) {
+ // i.e. least bit is nonzero runState bit
+ return ((c & -c) >>> 16) != 0;
+ }
+
+ /**
+ * Transition run state to at least the given state. Return true
+ * if not already at least given state.
+ */
+ private boolean transitionRunStateTo(int state) {
+ for (;;) {
+ int c = runControl;
+ if (runStateOf(c) >= state)
+ return false;
+ if (casRunControl(c, runControlFor(state, activeCountOf(c))))
+ return true;
+ }
+ }
+
+ /**
+ * Controls whether to add spares to maintain parallelism
+ */
+ private volatile boolean maintainsParallelism;
+
+ // Constructors
+
+ /**
+ * Creates a {@code ForkJoinPool} with parallelism equal to {@link
+ * java.lang.Runtime#availableProcessors}, and using the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ *
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public ForkJoinPool() {
+ this(Runtime.getRuntime().availableProcessors(),
+ defaultForkJoinWorkerThreadFactory);
+ }
+
+ /**
+ * Creates a {@code ForkJoinPool} with the indicated parallelism
+ * level and using the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ *
+ * @param parallelism the parallelism level
+ * @throws IllegalArgumentException if parallelism less than or
+ * equal to zero, or greater than implementation limit
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public ForkJoinPool(int parallelism) {
+ this(parallelism, defaultForkJoinWorkerThreadFactory);
+ }
+
+ /**
+ * Creates a {@code ForkJoinPool} with parallelism equal to {@link
+ * java.lang.Runtime#availableProcessors}, and using the given
+ * thread factory.
+ *
+ * @param factory the factory for creating new threads
+ * @throws NullPointerException if the factory is null
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
+ this(Runtime.getRuntime().availableProcessors(), factory);
+ }
+
+ /**
+ * Creates a {@code ForkJoinPool} with the given parallelism and
+ * thread factory.
+ *
+ * @param parallelism the parallelism level
+ * @param factory the factory for creating new threads
+ * @throws IllegalArgumentException if parallelism less than or
+ * equal to zero, or greater than implementation limit
+ * @throws NullPointerException if the factory is null
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
+ if (parallelism <= 0 || parallelism > MAX_THREADS)
+ throw new IllegalArgumentException();
+ if (factory == null)
+ throw new NullPointerException();
+ checkPermission();
+ this.factory = factory;
+ this.parallelism = parallelism;
+ this.maxPoolSize = MAX_THREADS;
+ this.maintainsParallelism = true;
+ this.poolNumber = poolNumberGenerator.incrementAndGet();
+ this.workerLock = new ReentrantLock();
+ this.termination = workerLock.newCondition();
+ this.stealCount = new AtomicLong();
+ this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
+ // worker array and workers are lazily constructed
+ }
+
+ /**
+ * Creates a new worker thread using factory.
+ *
+ * @param index the index to assign worker
+ * @return new worker, or null if factory failed
+ */
+ private ForkJoinWorkerThread createWorker(int index) {
+ Thread.UncaughtExceptionHandler h = ueh;
+ ForkJoinWorkerThread w = factory.newThread(this);
+ if (w != null) {
+ w.poolIndex = index;
+ w.setDaemon(true);
+ w.setAsyncMode(locallyFifo);
+ w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
+ if (h != null)
+ w.setUncaughtExceptionHandler(h);
+ }
+ return w;
+ }
+
+ /**
+ * Returns a good size for worker array given pool size.
+ * Currently requires size to be a power of two.
+ */
+ private static int arraySizeFor(int poolSize) {
+ if (poolSize <= 1)
+ return 1;
+ // See Hackers Delight, sec 3.2
+ int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
+ c |= c >>> 1;
+ c |= c >>> 2;
+ c |= c >>> 4;
+ c |= c >>> 8;
+ c |= c >>> 16;
+ return c + 1;
+ }
+
+ /**
+ * Creates or resizes array if necessary to hold newLength.
+ * Call only under exclusion.
+ *
+ * @return the array
+ */
+ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws == null)
+ return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
+ else if (newLength > ws.length)
+ return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
+ else
+ return ws;
+ }
+
+ /**
+ * Tries to shrink workers into smaller array after one or more terminate.
+ */
+ private void tryShrinkWorkerArray() {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ int len = ws.length;
+ int last = len - 1;
+ while (last >= 0 && ws[last] == null)
+ --last;
+ int newLength = arraySizeFor(last+1);
+ if (newLength < len)
+ workers = Arrays.copyOf(ws, newLength);
+ }
+ }
+
+ /**
+ * Initializes workers if necessary.
+ */
+ final void ensureWorkerInitialization() {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws == null) {
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ws = workers;
+ if (ws == null) {
+ int ps = parallelism;
+ ws = ensureWorkerArrayCapacity(ps);
+ for (int i = 0; i < ps; ++i) {
+ ForkJoinWorkerThread w = createWorker(i);
+ if (w != null) {
+ ws[i] = w;
+ w.start();
+ updateWorkerCount(1);
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Worker creation and startup for threads added via setParallelism.
+ */
+ private void createAndStartAddedWorkers() {
+ resumeAllSpares(); // Allow spares to convert to nonspare
+ int ps = parallelism;
+ ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
+ int len = ws.length;
+ // Sweep through slots, to keep lowest indices most populated
+ int k = 0;
+ while (k < len) {
+ if (ws[k] != null) {
+ ++k;
+ continue;
+ }
+ int s = workerCounts;
+ int tc = totalCountOf(s);
+ int rc = runningCountOf(s);
+ if (rc >= ps || tc >= ps)
+ break;
+ if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
+ ForkJoinWorkerThread w = createWorker(k);
+ if (w != null) {
+ ws[k++] = w;
+ w.start();
+ }
+ else {
+ updateWorkerCount(-1); // back out on failed creation
+ break;
+ }
+ }
+ }
+ }
+
+ // Execution methods
+
+ /**
+ * Common code for execute, invoke and submit
+ */
+ private <T> void doSubmit(ForkJoinTask<T> task) {
+ if (task == null)
+ throw new NullPointerException();
+ if (isShutdown())
+ throw new RejectedExecutionException();
+ if (workers == null)
+ ensureWorkerInitialization();
+ submissionQueue.offer(task);
+ signalIdleWorkers();
+ }
+
+ /**
+ * Performs the given task, returning its result upon completion.
+ *
+ * @param task the task
+ * @return the task's result
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public <T> T invoke(ForkJoinTask<T> task) {
+ doSubmit(task);
+ return task.join();
+ }
+
+ /**
+ * Arranges for (asynchronous) execution of the given task.
+ *
+ * @param task the task
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public void execute(ForkJoinTask<?> task) {
+ doSubmit(task);
+ }
+
+ // AbstractExecutorService methods
+
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public void execute(Runnable task) {
+ ForkJoinTask<?> job;
+ if (task instanceof ForkJoinTask<?>) // avoid re-wrap
+ job = (ForkJoinTask<?>) task;
+ else
+ job = ForkJoinTask.adapt(task, null);
+ doSubmit(job);
+ }
+
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public <T> ForkJoinTask<T> submit(Callable<T> task) {
+ ForkJoinTask<T> job = ForkJoinTask.adapt(task);
+ doSubmit(job);
+ return job;
+ }
+
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public <T> ForkJoinTask<T> submit(Runnable task, T result) {
+ ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
+ doSubmit(job);
+ return job;
+ }
+
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public ForkJoinTask<?> submit(Runnable task) {
+ ForkJoinTask<?> job;
+ if (task instanceof ForkJoinTask<?>) // avoid re-wrap
+ job = (ForkJoinTask<?>) task;
+ else
+ job = ForkJoinTask.adapt(task, null);
+ doSubmit(job);
+ return job;
+ }
+
+ /**
+ * Submits a ForkJoinTask for execution.
+ *
+ * @param task the task to submit
+ * @return the task
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
+ doSubmit(task);
+ return task;
+ }
+
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws RejectedExecutionException {@inheritDoc}
+ */
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+ ArrayList<ForkJoinTask<T>> forkJoinTasks =
+ new ArrayList<ForkJoinTask<T>>(tasks.size());
+ for (Callable<T> task : tasks)
+ forkJoinTasks.add(ForkJoinTask.adapt(task));
+ invoke(new InvokeAll<T>(forkJoinTasks));
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
+ return futures;
+ }
+
+ static final class InvokeAll<T> extends RecursiveAction {
+ final ArrayList<ForkJoinTask<T>> tasks;
+ InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
+ public void compute() {
+ try { invokeAll(tasks); }
+ catch (Exception ignore) {}
+ }
+ private static final long serialVersionUID = -7914297376763021607L;
+ }
+
+ // Configuration and status settings and queries
+
+ /**
+ * Returns the factory used for constructing new workers.
+ *
+ * @return the factory used for constructing new workers
+ */
+ public ForkJoinWorkerThreadFactory getFactory() {
+ return factory;
+ }
+
+ /**
+ * Returns the handler for internal worker threads that terminate
+ * due to unrecoverable errors encountered while executing tasks.
+ *
+ * @return the handler, or {@code null} if none
+ */
+ public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ Thread.UncaughtExceptionHandler h;
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ h = ueh;
+ } finally {
+ lock.unlock();
+ }
+ return h;
+ }
+
+ /**
+ * Sets the handler for internal worker threads that terminate due
+ * to unrecoverable errors encountered while executing tasks.
+ * Unless set, the current default or ThreadGroup handler is used
+ * as handler.
+ *
+ * @param h the new handler
+ * @return the old handler, or {@code null} if none
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public Thread.UncaughtExceptionHandler
+ setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
+ checkPermission();
+ Thread.UncaughtExceptionHandler old = null;
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ old = ueh;
+ ueh = h;
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null)
+ w.setUncaughtExceptionHandler(h);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return old;
+ }
+
+
+ /**
+ * Sets the target parallelism level of this pool.
+ *
+ * @param parallelism the target parallelism
+ * @throws IllegalArgumentException if parallelism less than or
+ * equal to zero or greater than maximum size bounds
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public void setParallelism(int parallelism) {
+ checkPermission();
+ if (parallelism <= 0 || parallelism > maxPoolSize)
+ throw new IllegalArgumentException();
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ if (isProcessingTasks()) {
+ int p = this.parallelism;
+ this.parallelism = parallelism;
+ if (parallelism > p)
+ createAndStartAddedWorkers();
+ else
+ trimSpares();
+ }
+ } finally {
+ lock.unlock();
+ }
+ signalIdleWorkers();
+ }
+
+ /**
+ * Returns the targeted parallelism level of this pool.
+ *
+ * @return the targeted parallelism level of this pool
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Returns the number of worker threads that have started but not
+ * yet terminated. This result returned by this method may differ
+ * from {@link #getParallelism} when threads are created to
+ * maintain parallelism when others are cooperatively blocked.
+ *
+ * @return the number of worker threads
+ */
+ public int getPoolSize() {
+ return totalCountOf(workerCounts);
+ }
+
+ /**
+ * Returns the maximum number of threads allowed to exist in the
+ * pool. Unless set using {@link #setMaximumPoolSize}, the
+ * maximum is an implementation-defined value designed only to
+ * prevent runaway growth.
+ *
+ * @return the maximum
+ */
+ public int getMaximumPoolSize() {
+ return maxPoolSize;
+ }
+
+ /**
+ * Sets the maximum number of threads allowed to exist in the
+ * pool. The given value should normally be greater than or equal
+ * to the {@link #getParallelism parallelism} level. Setting this
+ * value has no effect on current pool size. It controls
+ * construction of new threads.
+ *
+ * @throws IllegalArgumentException if negative or greater than
+ * internal implementation limit
+ */
+ public void setMaximumPoolSize(int newMax) {
+ if (newMax < 0 || newMax > MAX_THREADS)
+ throw new IllegalArgumentException();
+ maxPoolSize = newMax;
+ }
+
+
+ /**
+ * Returns {@code true} if this pool dynamically maintains its
+ * target parallelism level. If false, new threads are added only
+ * to avoid possible starvation. This setting is by default true.
+ *
+ * @return {@code true} if maintains parallelism
+ */
+ public boolean getMaintainsParallelism() {
+ return maintainsParallelism;
+ }
+
+ /**
+ * Sets whether this pool dynamically maintains its target
+ * parallelism level. If false, new threads are added only to
+ * avoid possible starvation.
+ *
+ * @param enable {@code true} to maintain parallelism
+ */
+ public void setMaintainsParallelism(boolean enable) {
+ maintainsParallelism = enable;
+ }
+
+ /**
+ * Establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined. This mode may be more appropriate
+ * than default locally stack-based mode in applications in which
+ * worker threads only process asynchronous tasks. This method is
+ * designed to be invoked only when the pool is quiescent, and
+ * typically only before any tasks are submitted. The effects of
+ * invocations at other times may be unpredictable.
+ *
+ * @param async if {@code true}, use locally FIFO scheduling
+ * @return the previous mode
+ * @see #getAsyncMode
+ */
+ public boolean setAsyncMode(boolean async) {
+ boolean oldMode = locallyFifo;
+ locallyFifo = async;
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.setAsyncMode(async);
+ }
+ }
+ return oldMode;
+ }
+
+ /**
+ * Returns {@code true} if this pool uses local first-in-first-out
+ * scheduling mode for forked tasks that are never joined.
+ *
+ * @return {@code true} if this pool uses async mode
+ * @see #setAsyncMode
+ */
+ public boolean getAsyncMode() {
+ return locallyFifo;
+ }
+
+ /**
+ * Returns an estimate of the number of worker threads that are
+ * not blocked waiting to join tasks or for other managed
+ * synchronization.
+ *
+ * @return the number of worker threads
+ */
+ public int getRunningThreadCount() {
+ return runningCountOf(workerCounts);
+ }
+
+ /**
+ * Returns an estimate of the number of threads that are currently
+ * stealing or executing tasks. This method may overestimate the
+ * number of active threads.
+ *
+ * @return the number of active threads
+ */
+ public int getActiveThreadCount() {
+ return activeCountOf(runControl);
+ }
+
+ /**
+ * Returns an estimate of the number of threads that are currently
+ * idle waiting for tasks. This method may underestimate the
+ * number of idle threads.
+ *
+ * @return the number of idle threads
+ */
+ final int getIdleThreadCount() {
+ int c = runningCountOf(workerCounts) - activeCountOf(runControl);
+ return (c <= 0) ? 0 : c;
+ }
+
+ /**
+ * Returns {@code true} if all worker threads are currently idle.
+ * An idle worker is one that cannot obtain a task to execute
+ * because none are available to steal from other threads, and
+ * there are no pending submissions to the pool. This method is
+ * conservative; it might not return {@code true} immediately upon
+ * idleness of all threads, but will eventually become true if
+ * threads remain inactive.
+ *
+ * @return {@code true} if all threads are currently idle
+ */
+ public boolean isQuiescent() {
+ return activeCountOf(runControl) == 0;
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks stolen from
+ * one thread's work queue by another. The reported value
+ * underestimates the actual total number of steals when the pool
+ * is not quiescent. This value may be useful for monitoring and
+ * tuning fork/join programs: in general, steal counts should be
+ * high enough to keep threads busy, but low enough to avoid
+ * overhead and contention across threads.
+ *
+ * @return the number of steals
+ */
+ public long getStealCount() {
+ return stealCount.get();
+ }
+
+ /**
+ * Accumulates steal count from a worker.
+ * Call only when worker known to be idle.
+ */
+ private void updateStealCount(ForkJoinWorkerThread w) {
+ int sc = w.getAndClearStealCount();
+ if (sc != 0)
+ stealCount.addAndGet(sc);
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks currently held
+ * in queues by worker threads (but not including tasks submitted
+ * to the pool that have not begun executing). This value is only
+ * an approximation, obtained by iterating across all threads in
+ * the pool. This method may be useful for tuning task
+ * granularities.
+ *
+ * @return the number of queued tasks
+ */
+ public long getQueuedTaskCount() {
+ long count = 0;
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ count += t.getQueueSize();
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Returns an estimate of the number of tasks submitted to this
+ * pool that have not yet begun executing. This method takes time
+ * proportional to the number of submissions.
+ *
+ * @return the number of queued submissions
+ */
+ public int getQueuedSubmissionCount() {
+ return submissionQueue.size();
+ }
+
+ /**
+ * Returns {@code true} if there are any tasks submitted to this
+ * pool that have not yet begun executing.
+ *
+ * @return {@code true} if there are any queued submissions
+ */
+ public boolean hasQueuedSubmissions() {
+ return !submissionQueue.isEmpty();
+ }
+
+ /**
+ * Removes and returns the next unexecuted submission if one is
+ * available. This method may be useful in extensions to this
+ * class that re-assign work in systems with multiple pools.
+ *
+ * @return the next submission, or {@code null} if none
+ */
+ protected ForkJoinTask<?> pollSubmission() {
+ return submissionQueue.poll();
+ }
+
+ /**
+ * Removes all available unexecuted submitted and forked tasks
+ * from scheduling queues and adds them to the given collection,
+ * without altering their execution status. These may include
+ * artificially generated or wrapped tasks. This method is
+ * designed to be invoked only when the pool is known to be
+ * quiescent. Invocations at other times may not remove all
+ * tasks. A failure encountered while attempting to add elements
+ * to collection {@code c} may result in elements being in
+ * neither, either or both collections when the associated
+ * exception is thrown. The behavior of this operation is
+ * undefined if the specified collection is modified while the
+ * operation is in progress.
+ *
+ * @param c the collection to transfer elements into
+ * @return the number of elements transferred
+ */
+ protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
+ int n = submissionQueue.drainTo(c);
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null)
+ n += w.drainTasksTo(c);
+ }
+ }
+ return n;
+ }
+
+ /**
+ * Returns a string identifying this pool, as well as its state,
+ * including indications of run state, parallelism level, and
+ * worker and task counts.
+ *
+ * @return a string identifying this pool, as well as its state
+ */
+ public String toString() {
+ int ps = parallelism;
+ int wc = workerCounts;
+ int rc = runControl;
+ long st = getStealCount();
+ long qt = getQueuedTaskCount();
+ long qs = getQueuedSubmissionCount();
+ return super.toString() +
+ "[" + runStateToString(runStateOf(rc)) +
+ ", parallelism = " + ps +
+ ", size = " + totalCountOf(wc) +
+ ", active = " + activeCountOf(rc) +
+ ", running = " + runningCountOf(wc) +
+ ", steals = " + st +
+ ", tasks = " + qt +
+ ", submissions = " + qs +
+ "]";
+ }
+
+ private static String runStateToString(int rs) {
+ switch(rs) {
+ case RUNNING: return "Running";
+ case SHUTDOWN: return "Shutting down";
+ case TERMINATING: return "Terminating";
+ case TERMINATED: return "Terminated";
+ default: throw new Error("Unknown run state");
+ }
+ }
+
+ // lifecycle control
+
+ /**
+ * Initiates an orderly shutdown in which previously submitted
+ * tasks are executed, but no new tasks will be accepted.
+ * Invocation has no additional effect if already shut down.
+ * Tasks that are in the process of being submitted concurrently
+ * during the course of this method may or may not be rejected.
+ *
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public void shutdown() {
+ checkPermission();
+ transitionRunStateTo(SHUTDOWN);
+ if (canTerminateOnShutdown(runControl)) {
+ if (workers == null) { // shutting down before workers created
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ if (workers == null) {
+ terminate();
+ transitionRunStateTo(TERMINATED);
+ termination.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ terminateOnShutdown();
+ }
+ }
+
+ /**
+ * Attempts to cancel and/or stop all tasks, and reject all
+ * subsequently submitted tasks. Tasks that are in the process of
+ * being submitted or executed concurrently during the course of
+ * this method may or may not be rejected. This method cancels
+ * both existing and unexecuted tasks, in order to permit
+ * termination in the presence of task dependencies. So the method
+ * always returns an empty list (unlike the case for some other
+ * Executors).
+ *
+ * @return an empty list
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ */
+ public List<Runnable> shutdownNow() {
+ checkPermission();
+ terminate();
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns {@code true} if all tasks have completed following shut down.
+ *
+ * @return {@code true} if all tasks have completed following shut down
+ */
+ public boolean isTerminated() {
+ return runStateOf(runControl) == TERMINATED;
+ }
+
+ /**
+ * Returns {@code true} if the process of termination has
+ * commenced but not yet completed. This method may be useful for
+ * debugging. A return of {@code true} reported a sufficient
+ * period after shutdown may indicate that submitted tasks have
+ * ignored or suppressed interruption, causing this executor not
+ * to properly terminate.
+ *
+ * @return {@code true} if terminating but not yet terminated
+ */
+ public boolean isTerminating() {
+ return runStateOf(runControl) == TERMINATING;
+ }
+
+ /**
+ * Returns {@code true} if this pool has been shut down.
+ *
+ * @return {@code true} if this pool has been shut down
+ */
+ public boolean isShutdown() {
+ return runStateOf(runControl) >= SHUTDOWN;
+ }
+
+ /**
+ * Returns true if pool is not terminating or terminated.
+ * Used internally to suppress execution when terminating.
+ */
+ final boolean isProcessingTasks() {
+ return runStateOf(runControl) < TERMINATING;
+ }
+
+ /**
+ * Blocks until all tasks have completed execution after a shutdown
+ * request, or the timeout occurs, or the current thread is
+ * interrupted, whichever happens first.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return {@code true} if this executor terminated and
+ * {@code false} if the timeout elapsed before termination
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ for (;;) {
+ if (isTerminated())
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = termination.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Shutdown and termination support
+
+ /**
+ * Callback from terminating worker. Nulls out the corresponding
+ * workers slot, and if terminating, tries to terminate; else
+ * tries to shrink workers array.
+ *
+ * @param w the worker
+ */
+ final void workerTerminated(ForkJoinWorkerThread w) {
+ updateStealCount(w);
+ updateWorkerCount(-1);
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ int idx = w.poolIndex;
+ if (idx >= 0 && idx < ws.length && ws[idx] == w)
+ ws[idx] = null;
+ if (totalCountOf(workerCounts) == 0) {
+ terminate(); // no-op if already terminating
+ transitionRunStateTo(TERMINATED);
+ termination.signalAll();
+ }
+ else if (isProcessingTasks()) {
+ tryShrinkWorkerArray();
+ tryResumeSpare(true); // allow replacement
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ signalIdleWorkers();
+ }
+
+ /**
+ * Initiates termination.
+ */
+ private void terminate() {
+ if (transitionRunStateTo(TERMINATING)) {
+ stopAllWorkers();
+ resumeAllSpares();
+ signalIdleWorkers();
+ cancelQueuedSubmissions();
+ cancelQueuedWorkerTasks();
+ interruptUnterminatedWorkers();
+ signalIdleWorkers(); // resignal after interrupt
+ }
+ }
+
+ /**
+ * Possibly terminates when on shutdown state.
+ */
+ private void terminateOnShutdown() {
+ if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
+ terminate();
+ }
+
+ /**
+ * Clears out and cancels submissions.
+ */
+ private void cancelQueuedSubmissions() {
+ ForkJoinTask<?> task;
+ while ((task = pollSubmission()) != null)
+ task.cancel(false);
+ }
+
+ /**
+ * Cleans out worker queues.
+ */
+ private void cancelQueuedWorkerTasks() {
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.cancelTasks();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Sets each worker's status to terminating. Requires lock to avoid
+ * conflicts with add/remove.
+ */
+ private void stopAllWorkers() {
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.shutdownNow();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Interrupts all unterminated workers. This is not required for
+ * sake of internal control, but may help unstick user code during
+ * shutdown.
+ */
+ private void interruptUnterminatedWorkers() {
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null && !t.isTerminated()) {
+ try {
+ t.interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ /*
+ * Nodes for event barrier to manage idle threads. Queue nodes
+ * are basic Treiber stack nodes, also used for spare stack.
+ *
+ * The event barrier has an event count and a wait queue (actually
+ * a Treiber stack). Workers are enabled to look for work when
+ * the eventCount is incremented. If they fail to find work, they
+ * may wait for next count. Upon release, threads help others wake
+ * up.
+ *
+ * Synchronization events occur only in enough contexts to
+ * maintain overall liveness:
+ *
+ * - Submission of a new task to the pool
+ * - Resizes or other changes to the workers array
+ * - pool termination
+ * - A worker pushing a task on an empty queue
+ *
+ * The case of pushing a task occurs often enough, and is heavy
+ * enough compared to simple stack pushes, to require special
+ * handling: Method signalWork returns without advancing count if
+ * the queue appears to be empty. This would ordinarily result in
+ * races causing some queued waiters not to be woken up. To avoid
+ * this, the first worker enqueued in method sync (see
+ * syncIsReleasable) rescans for tasks after being enqueued, and
+ * helps signal if any are found. This works well because the
+ * worker has nothing better to do, and so might as well help
+ * alleviate the overhead and contention on the threads actually
+ * doing work. Also, since event counts increments on task
+ * availability exist to maintain liveness (rather than to force
+ * refreshes etc), it is OK for callers to exit early if
+ * contending with another signaller.
+ */
+ static final class WaitQueueNode {
+ WaitQueueNode next; // only written before enqueued
+ volatile ForkJoinWorkerThread thread; // nulled to cancel wait
+ final long count; // unused for spare stack
+
+ WaitQueueNode(long c, ForkJoinWorkerThread w) {
+ count = c;
+ thread = w;
+ }
+
+ /**
+ * Wakes up waiter, returning false if known to already
+ */
+ boolean signal() {
+ ForkJoinWorkerThread t = thread;
+ if (t == null)
+ return false;
+ thread = null;
+ LockSupport.unpark(t);
+ return true;
+ }
+
+ /**
+ * Awaits release on sync.
+ */
+ void awaitSyncRelease(ForkJoinPool p) {
+ while (thread != null && !p.syncIsReleasable(this))
+ LockSupport.park(this);
+ }
+
+ /**
+ * Awaits resumption as spare.
+ */
+ void awaitSpareRelease() {
+ while (thread != null) {
+ if (!Thread.interrupted())
+ LockSupport.park(this);
+ }
+ }
+ }
+
+ /**
+ * Ensures that no thread is waiting for count to advance from the
+ * current value of eventCount read on entry to this method, by
+ * releasing waiting threads if necessary.
+ *
+ * @return the count
+ */
+ final long ensureSync() {
+ long c = eventCount;
+ WaitQueueNode q;
+ while ((q = syncStack) != null && q.count < c) {
+ if (casBarrierStack(q, null)) {
+ do {
+ q.signal();
+ } while ((q = q.next) != null);
+ break;
+ }
+ }
+ return c;
+ }
+
+ /**
+ * Increments event count and releases waiting threads.
+ */
+ private void signalIdleWorkers() {
+ long c;
+ do {} while (!casEventCount(c = eventCount, c+1));
+ ensureSync();
+ }
+
+ /**
+ * Signals threads waiting to poll a task. Because method sync
+ * rechecks availability, it is OK to only proceed if queue
+ * appears to be non-empty, and OK to skip under contention to
+ * increment count (since some other thread succeeded).
+ */
+ final void signalWork() {
+ long c;
+ WaitQueueNode q;
+ if (syncStack != null &&
+ casEventCount(c = eventCount, c+1) &&
+ (((q = syncStack) != null && q.count <= c) &&
+ (!casBarrierStack(q, q.next) || !q.signal())))
+ ensureSync();
+ }
+
+ /**
+ * Waits until event count advances from last value held by
+ * caller, or if excess threads, caller is resumed as spare, or
+ * caller or pool is terminating. Updates caller's event on exit.
+ *
+ * @param w the calling worker thread
+ */
+ final void sync(ForkJoinWorkerThread w) {
+ updateStealCount(w); // Transfer w's count while it is idle
+
+ while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
+ long prev = w.lastEventCount;
+ WaitQueueNode node = null;
+ WaitQueueNode h;
+ while (eventCount == prev &&
+ ((h = syncStack) == null || h.count == prev)) {
+ if (node == null)
+ node = new WaitQueueNode(prev, w);
+ if (casBarrierStack(node.next = h, node)) {
+ node.awaitSyncRelease(this);
+ break;
+ }
+ }
+ long ec = ensureSync();
+ if (ec != prev) {
+ w.lastEventCount = ec;
+ break;
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if worker waiting on sync can proceed:
+ * - on signal (thread == null)
+ * - on event count advance (winning race to notify vs signaller)
+ * - on interrupt
+ * - if the first queued node, we find work available
+ * If node was not signalled and event count not advanced on exit,
+ * then we also help advance event count.
+ *
+ * @return {@code true} if node can be released
+ */
+ final boolean syncIsReleasable(WaitQueueNode node) {
+ long prev = node.count;
+ if (!Thread.interrupted() && node.thread != null &&
+ (node.next != null ||
+ !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
+ eventCount == prev)
+ return false;
+ if (node.thread != null) {
+ node.thread = null;
+ long ec = eventCount;
+ if (prev <= ec) // help signal
+ casEventCount(ec, ec+1);
+ }
+ return true;
+ }
+
+ /**
+ * Returns {@code true} if a new sync event occurred since last
+ * call to sync or this method, if so, updating caller's count.
+ */
+ final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
+ long lc = w.lastEventCount;
+ long ec = ensureSync();
+ if (ec == lc)
+ return false;
+ w.lastEventCount = ec;
+ return true;
+ }
+
+ // Parallelism maintenance
+
+ /**
+ * Decrements running count; if too low, adds spare.
+ *
+ * Conceptually, all we need to do here is add or resume a
+ * spare thread when one is about to block (and remove or
+ * suspend it later when unblocked -- see suspendIfSpare).
+ * However, implementing this idea requires coping with
+ * several problems: we have imperfect information about the
+ * states of threads. Some count updates can and usually do
+ * lag run state changes, despite arrangements to keep them
+ * accurate (for example, when possible, updating counts
+ * before signalling or resuming), especially when running on
+ * dynamic JVMs that don't optimize the infrequent paths that
+ * update counts. Generating too many threads can make these
+ * problems become worse, because excess threads are more
+ * likely to be context-switched with others, slowing them all
+ * down, especially if there is no work available, so all are
+ * busy scanning or idling. Also, excess spare threads can
+ * only be suspended or removed when they are idle, not
+ * immediately when they aren't needed. So adding threads will
+ * raise parallelism level for longer than necessary. Also,
+ * FJ applications often encounter highly transient peaks when
+ * many threads are blocked joining, but for less time than it
+ * takes to create or resume spares.
+ *
+ * @param joinMe if non-null, return early if done
+ * @param maintainParallelism if true, try to stay within
+ * target counts, else create only to avoid starvation
+ * @return true if joinMe known to be done
+ */
+ final boolean preJoin(ForkJoinTask<?> joinMe,
+ boolean maintainParallelism) {
+ maintainParallelism &= maintainsParallelism; // overrride
+ boolean dec = false; // true when running count decremented
+ while (spareStack == null || !tryResumeSpare(dec)) {
+ int counts = workerCounts;
+ if (dec || (dec = casWorkerCounts(counts, --counts))) {
+ if (!needSpare(counts, maintainParallelism))
+ break;
+ if (joinMe.status < 0)
+ return true;
+ if (tryAddSpare(counts))
+ break;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Same idea as preJoin
+ */
+ final boolean preBlock(ManagedBlocker blocker,
+ boolean maintainParallelism) {
+ maintainParallelism &= maintainsParallelism;
+ boolean dec = false;
+ while (spareStack == null || !tryResumeSpare(dec)) {
+ int counts = workerCounts;
+ if (dec || (dec = casWorkerCounts(counts, --counts))) {
+ if (!needSpare(counts, maintainParallelism))
+ break;
+ if (blocker.isReleasable())
+ return true;
+ if (tryAddSpare(counts))
+ break;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns {@code true} if a spare thread appears to be needed.
+ * If maintaining parallelism, returns true when the deficit in
+ * running threads is more than the surplus of total threads, and
+ * there is apparently some work to do. This self-limiting rule
+ * means that the more threads that have already been added, the
+ * less parallelism we will tolerate before adding another.
+ *
+ * @param counts current worker counts
+ * @param maintainParallelism try to maintain parallelism
+ */
+ private boolean needSpare(int counts, boolean maintainParallelism) {
+ int ps = parallelism;
+ int rc = runningCountOf(counts);
+ int tc = totalCountOf(counts);
+ int runningDeficit = ps - rc;
+ int totalSurplus = tc - ps;
+ return (tc < maxPoolSize &&
+ (rc == 0 || totalSurplus < 0 ||
+ (maintainParallelism &&
+ runningDeficit > totalSurplus &&
+ ForkJoinWorkerThread.hasQueuedTasks(workers))));
+ }
+
+ /**
+ * Adds a spare worker if lock available and no more than the
+ * expected numbers of threads exist.
+ *
+ * @return true if successful
+ */
+ private boolean tryAddSpare(int expectedCounts) {
+ final ReentrantLock lock = this.workerLock;
+ int expectedRunning = runningCountOf(expectedCounts);
+ int expectedTotal = totalCountOf(expectedCounts);
+ boolean success = false;
+ boolean locked = false;
+ // confirm counts while locking; CAS after obtaining lock
+ try {
+ for (;;) {
+ int s = workerCounts;
+ int tc = totalCountOf(s);
+ int rc = runningCountOf(s);
+ if (rc > expectedRunning || tc > expectedTotal)
+ break;
+ if (!locked && !(locked = lock.tryLock()))
+ break;
+ if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
+ createAndStartSpare(tc);
+ success = true;
+ break;
+ }
+ }
+ } finally {
+ if (locked)
+ lock.unlock();
+ }
+ return success;
+ }
+
+ /**
+ * Adds the kth spare worker. On entry, pool counts are already
+ * adjusted to reflect addition.
+ */
+ private void createAndStartSpare(int k) {
+ ForkJoinWorkerThread w = null;
+ ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
+ int len = ws.length;
+ // Probably, we can place at slot k. If not, find empty slot
+ if (k < len && ws[k] != null) {
+ for (k = 0; k < len && ws[k] != null; ++k)
+ ;
+ }
+ if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
+ ws[k] = w;
+ w.start();
+ }
+ else
+ updateWorkerCount(-1); // adjust on failure
+ signalIdleWorkers();
+ }
+
+ /**
+ * Suspends calling thread w if there are excess threads. Called
+ * only from sync. Spares are enqueued in a Treiber stack using
+ * the same WaitQueueNodes as barriers. They are resumed mainly
+ * in preJoin, but are also woken on pool events that require all
+ * threads to check run state.
+ *
+ * @param w the caller
+ */
+ private boolean suspendIfSpare(ForkJoinWorkerThread w) {
+ WaitQueueNode node = null;
+ int s;
+ while (parallelism < runningCountOf(s = workerCounts)) {
+ if (node == null)
+ node = new WaitQueueNode(0, w);
+ if (casWorkerCounts(s, s-1)) { // representation-dependent
+ // push onto stack
+ do {} while (!casSpareStack(node.next = spareStack, node));
+ // block until released by resumeSpare
+ node.awaitSpareRelease();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Tries to pop and resume a spare thread.
+ *
+ * @param updateCount if true, increment running count on success
+ * @return true if successful
+ */
+ private boolean tryResumeSpare(boolean updateCount) {
+ WaitQueueNode q;
+ while ((q = spareStack) != null) {
+ if (casSpareStack(q, q.next)) {
+ if (updateCount)
+ updateRunningCount(1);
+ q.signal();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Pops and resumes all spare threads. Same idea as ensureSync.
+ *
+ * @return true if any spares released
+ */
+ private boolean resumeAllSpares() {
+ WaitQueueNode q;
+ while ( (q = spareStack) != null) {
+ if (casSpareStack(q, null)) {
+ do {
+ updateRunningCount(1);
+ q.signal();
+ } while ((q = q.next) != null);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Pops and shuts down excessive spare threads. Call only while
+ * holding lock. This is not guaranteed to eliminate all excess
+ * threads, only those suspended as spares, which are the ones
+ * unlikely to be needed in the future.
+ */
+ private void trimSpares() {
+ int surplus = totalCountOf(workerCounts) - parallelism;
+ WaitQueueNode q;
+ while (surplus > 0 && (q = spareStack) != null) {
+ if (casSpareStack(q, null)) {
+ do {
+ updateRunningCount(1);
+ ForkJoinWorkerThread w = q.thread;
+ if (w != null && surplus > 0 &&
+ runningCountOf(workerCounts) > 0 && w.shutdown())
+ --surplus;
+ q.signal();
+ } while ((q = q.next) != null);
+ }
+ }
+ }
+
+ /**
+ * Interface for extending managed parallelism for tasks running
+ * in {@link ForkJoinPool}s.
+ *
+ * <p>A {@code ManagedBlocker} provides two methods.
+ * Method {@code isReleasable} must return {@code true} if
+ * blocking is not necessary. Method {@code block} blocks the
+ * current thread if necessary (perhaps internally invoking
+ * {@code isReleasable} before actually blocking).
+ *
+ * <p>For example, here is a ManagedBlocker based on a
+ * ReentrantLock:
+ * <pre> {@code
+ * class ManagedLocker implements ManagedBlocker {
+ * final ReentrantLock lock;
+ * boolean hasLock = false;
+ * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
+ * public boolean block() {
+ * if (!hasLock)
+ * lock.lock();
+ * return true;
+ * }
+ * public boolean isReleasable() {
+ * return hasLock || (hasLock = lock.tryLock());
+ * }
+ * }}</pre>
+ */
+ public static interface ManagedBlocker {
+ /**
+ * Possibly blocks the current thread, for example waiting for
+ * a lock or condition.
+ *
+ * @return {@code true} if no additional blocking is necessary
+ * (i.e., if isReleasable would return true)
+ * @throws InterruptedException if interrupted while waiting
+ * (the method is not required to do so, but is allowed to)
+ */
+ boolean block() throws InterruptedException;
+
+ /**
+ * Returns {@code true} if blocking is unnecessary.
+ */
+ boolean isReleasable();
+ }
+
+ /**
+ * Blocks in accord with the given blocker. If the current thread
+ * is a {@link ForkJoinWorkerThread}, this method possibly
+ * arranges for a spare thread to be activated if necessary to
+ * ensure parallelism while the current thread is blocked.
+ *
+ * <p>If {@code maintainParallelism} is {@code true} and the pool
+ * supports it ({@link #getMaintainsParallelism}), this method
+ * attempts to maintain the pool's nominal parallelism. Otherwise
+ * it activates a thread only if necessary to avoid complete
+ * starvation. This option may be preferable when blockages use
+ * timeouts, or are almost always brief.
+ *
+ * <p>If the caller is not a {@link ForkJoinTask}, this method is
+ * behaviorally equivalent to
+ * <pre> {@code
+ * while (!blocker.isReleasable())
+ * if (blocker.block())
+ * return;
+ * }</pre>
+ *
+ * If the caller is a {@code ForkJoinTask}, then the pool may
+ * first be expanded to ensure parallelism, and later adjusted.
+ *
+ * @param blocker the blocker
+ * @param maintainParallelism if {@code true} and supported by
+ * this pool, attempt to maintain the pool's nominal parallelism;
+ * otherwise activate a thread only if necessary to avoid
+ * complete starvation.
+ * @throws InterruptedException if blocker.block did so
+ */
+ public static void managedBlock(ManagedBlocker blocker,
+ boolean maintainParallelism)
+ throws InterruptedException {
+ Thread t = Thread.currentThread();
+ ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread) t).pool : null);
+ if (!blocker.isReleasable()) {
+ try {
+ if (pool == null ||
+ !pool.preBlock(blocker, maintainParallelism))
+ awaitBlocker(blocker);
+ } finally {
+ if (pool != null)
+ pool.updateRunningCount(1);
+ }
+ }
+ }
+
+ private static void awaitBlocker(ManagedBlocker blocker)
+ throws InterruptedException {
+ do {} while (!blocker.isReleasable() && !blocker.block());
+ }
+
+ // AbstractExecutorService overrides. These rely on undocumented
+ // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
+ // implement RunnableFuture.
+
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
+ }
+
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long eventCountOffset =
+ objectFieldOffset("eventCount", ForkJoinPool.class);
+ private static final long workerCountsOffset =
+ objectFieldOffset("workerCounts", ForkJoinPool.class);
+ private static final long runControlOffset =
+ objectFieldOffset("runControl", ForkJoinPool.class);
+ private static final long syncStackOffset =
+ objectFieldOffset("syncStack",ForkJoinPool.class);
+ private static final long spareStackOffset =
+ objectFieldOffset("spareStack", ForkJoinPool.class);
+
+ private boolean casEventCount(long cmp, long val) {
+ return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
+ }
+ private boolean casWorkerCounts(int cmp, int val) {
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
+ }
+ private boolean casRunControl(int cmp, int val) {
+ return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
+ }
+ private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
+ return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
+ }
+ private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
+ return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
+ }
+
+ private static long objectFieldOffset(String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,1292 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.RandomAccess;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * Abstract base class for tasks that run within a {@link ForkJoinPool}.
+ * A {@code ForkJoinTask} is a thread-like entity that is much
+ * lighter weight than a normal thread. Huge numbers of tasks and
+ * subtasks may be hosted by a small number of actual threads in a
+ * ForkJoinPool, at the price of some usage limitations.
+ *
+ * <p>A "main" {@code ForkJoinTask} begins execution when submitted
+ * to a {@link ForkJoinPool}. Once started, it will usually in turn
+ * start other subtasks. As indicated by the name of this class,
+ * many programs using {@code ForkJoinTask} employ only methods
+ * {@link #fork} and {@link #join}, or derivatives such as {@link
+ * #invokeAll}. However, this class also provides a number of other
+ * methods that can come into play in advanced usages, as well as
+ * extension mechanics that allow support of new forms of fork/join
+ * processing.
+ *
+ * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
+ * The efficiency of {@code ForkJoinTask}s stems from a set of
+ * restrictions (that are only partially statically enforceable)
+ * reflecting their intended use as computational tasks calculating
+ * pure functions or operating on purely isolated objects. The
+ * primary coordination mechanisms are {@link #fork}, that arranges
+ * asynchronous execution, and {@link #join}, that doesn't proceed
+ * until the task's result has been computed. Computations should
+ * avoid {@code synchronized} methods or blocks, and should minimize
+ * other blocking synchronization apart from joining other tasks or
+ * using synchronizers such as Phasers that are advertised to
+ * cooperate with fork/join scheduling. Tasks should also not perform
+ * blocking IO, and should ideally access variables that are
+ * completely independent of those accessed by other running
+ * tasks. Minor breaches of these restrictions, for example using
+ * shared output streams, may be tolerable in practice, but frequent
+ * use may result in poor performance, and the potential to
+ * indefinitely stall if the number of threads not waiting for IO or
+ * other external synchronization becomes exhausted. This usage
+ * restriction is in part enforced by not permitting checked
+ * exceptions such as {@code IOExceptions} to be thrown. However,
+ * computations may still encounter unchecked exceptions, that are
+ * rethrown to callers attempting to join them. These exceptions may
+ * additionally include {@link RejectedExecutionException} stemming
+ * from internal resource exhaustion, such as failure to allocate
+ * internal task queues.
+ *
+ * <p>The primary method for awaiting completion and extracting
+ * results of a task is {@link #join}, but there are several variants:
+ * The {@link Future#get} methods support interruptible and/or timed
+ * waits for completion and report results using {@code Future}
+ * conventions. Method {@link #helpJoin} enables callers to actively
+ * execute other tasks while awaiting joins, which is sometimes more
+ * efficient but only applies when all subtasks are known to be
+ * strictly tree-structured. Method {@link #invoke} is semantically
+ * equivalent to {@code fork(); join()} but always attempts to begin
+ * execution in the current thread. The "<em>quiet</em>" forms of
+ * these methods do not extract results or report exceptions. These
+ * may be useful when a set of tasks are being executed, and you need
+ * to delay processing of results or exceptions until all complete.
+ * Method {@code invokeAll} (available in multiple versions)
+ * performs the most common form of parallel invocation: forking a set
+ * of tasks and joining them all.
+ *
+ * <p>The execution status of tasks may be queried at several levels
+ * of detail: {@link #isDone} is true if a task completed in any way
+ * (including the case where a task was cancelled without executing);
+ * {@link #isCompletedNormally} is true if a task completed without
+ * cancellation or encountering an exception; {@link #isCancelled} is
+ * true if the task was cancelled (in which case {@link #getException}
+ * returns a {@link java.util.concurrent.CancellationException}); and
+ * {@link #isCompletedAbnormally} is true if a task was either
+ * cancelled or encountered an exception, in which case {@link
+ * #getException} will return either the encountered exception or
+ * {@link java.util.concurrent.CancellationException}.
+ *
+ * <p>The ForkJoinTask class is not usually directly subclassed.
+ * Instead, you subclass one of the abstract classes that support a
+ * particular style of fork/join processing, typically {@link
+ * RecursiveAction} for computations that do not return results, or
+ * {@link RecursiveTask} for those that do. Normally, a concrete
+ * ForkJoinTask subclass declares fields comprising its parameters,
+ * established in a constructor, and then defines a {@code compute}
+ * method that somehow uses the control methods supplied by this base
+ * class. While these methods have {@code public} access (to allow
+ * instances of different task subclasses to call each other's
+ * methods), some of them may only be called from within other
+ * ForkJoinTasks (as may be determined using method {@link
+ * #inForkJoinPool}). Attempts to invoke them in other contexts
+ * result in exceptions or errors, possibly including
+ * ClassCastException.
+ *
+ * <p>Most base support methods are {@code final}, to prevent
+ * overriding of implementations that are intrinsically tied to the
+ * underlying lightweight task scheduling framework. Developers
+ * creating new basic styles of fork/join processing should minimally
+ * implement {@code protected} methods {@link #exec}, {@link
+ * #setRawResult}, and {@link #getRawResult}, while also introducing
+ * an abstract computational method that can be implemented in its
+ * subclasses, possibly relying on other {@code protected} methods
+ * provided by this class.
+ *
+ * <p>ForkJoinTasks should perform relatively small amounts of
+ * computation. Large tasks should be split into smaller subtasks,
+ * usually via recursive decomposition. As a very rough rule of thumb,
+ * a task should perform more than 100 and less than 10000 basic
+ * computational steps. If tasks are too big, then parallelism cannot
+ * improve throughput. If too small, then memory and internal task
+ * maintenance overhead may overwhelm processing.
+ *
+ * <p>This class provides {@code adapt} methods for {@link Runnable}
+ * and {@link Callable}, that may be of use when mixing execution of
+ * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
+ * are of this form, consider using a pool in
+ * {@linkplain ForkJoinPool#setAsyncMode async mode}.
+ *
+ * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
+ * used in extensions such as remote execution frameworks. It is
+ * sensible to serialize tasks only before or after, but not during,
+ * execution. Serialization is not relied on during execution itself.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
+
+ /**
+ * Run control status bits packed into a single int to minimize
+ * footprint and to ensure atomicity (via CAS). Status is
+ * initially zero, and takes on nonnegative values until
+ * completed, upon which status holds COMPLETED. CANCELLED, or
+ * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
+ * blocking waits by other threads have SIGNAL_MASK bits set --
+ * bit 15 for external (nonFJ) waits, and the rest a count of
+ * waiting FJ threads. (This representation relies on
+ * ForkJoinPool max thread limits). Completion of a stolen task
+ * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
+ * though suboptimal for some purposes, we use basic builtin
+ * wait/notify to take advantage of "monitor inflation" in JVMs
+ * that we would otherwise need to emulate to avoid adding further
+ * per-task bookkeeping overhead. Note that bits 16-28 are
+ * currently unused. Also value 0x80000000 is available as spare
+ * completion value.
+ */
+ volatile int status; // accessed directly by pool and workers
+
+ static final int COMPLETION_MASK = 0xe0000000;
+ static final int NORMAL = 0xe0000000; // == mask
+ static final int CANCELLED = 0xc0000000;
+ static final int EXCEPTIONAL = 0xa0000000;
+ static final int SIGNAL_MASK = 0x0000ffff;
+ static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
+ static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
+
+ /**
+ * Table of exceptions thrown by tasks, to enable reporting by
+ * callers. Because exceptions are rare, we don't directly keep
+ * them with task objects, but instead use a weak ref table. Note
+ * that cancellation exceptions don't appear in the table, but are
+ * instead recorded as status values.
+ * TODO: Use ConcurrentReferenceHashMap
+ */
+ static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
+ Collections.synchronizedMap
+ (new WeakHashMap<ForkJoinTask<?>, Throwable>());
+
+ // within-package utilities
+
+ /**
+ * Gets current worker thread, or null if not a worker thread.
+ */
+ static ForkJoinWorkerThread getWorker() {
+ Thread t = Thread.currentThread();
+ return ((t instanceof ForkJoinWorkerThread) ?
+ (ForkJoinWorkerThread) t : null);
+ }
+
+ final boolean casStatus(int cmp, int val) {
+ return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
+ }
+
+ /**
+ * Workaround for not being able to rethrow unchecked exceptions.
+ */
+ static void rethrowException(Throwable ex) {
+ if (ex != null)
+ UNSAFE.throwException(ex);
+ }
+
+ // Setting completion status
+
+ /**
+ * Marks completion and wakes up threads waiting to join this task.
+ *
+ * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
+ */
+ final void setCompletion(int completion) {
+ ForkJoinPool pool = getPool();
+ if (pool != null) {
+ int s; // Clear signal bits while setting completion status
+ do {} while ((s = status) >= 0 && !casStatus(s, completion));
+
+ if ((s & SIGNAL_MASK) != 0) {
+ if ((s &= INTERNAL_SIGNAL_MASK) != 0)
+ pool.updateRunningCount(s);
+ synchronized (this) { notifyAll(); }
+ }
+ }
+ else
+ externallySetCompletion(completion);
+ }
+
+ /**
+ * Version of setCompletion for non-FJ threads. Leaves signal
+ * bits for unblocked threads to adjust, and always notifies.
+ */
+ private void externallySetCompletion(int completion) {
+ int s;
+ do {} while ((s = status) >= 0 &&
+ !casStatus(s, (s & SIGNAL_MASK) | completion));
+ synchronized (this) { notifyAll(); }
+ }
+
+ /**
+ * Sets status to indicate normal completion.
+ */
+ final void setNormalCompletion() {
+ // Try typical fast case -- single CAS, no signal, not already done.
+ // Manually expand casStatus to improve chances of inlining it
+ if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
+ setCompletion(NORMAL);
+ }
+
+ // internal waiting and notification
+
+ /**
+ * Performs the actual monitor wait for awaitDone.
+ */
+ private void doAwaitDone() {
+ // Minimize lock bias and in/de-flation effects by maximizing
+ // chances of waiting inside sync
+ try {
+ while (status >= 0)
+ synchronized (this) { if (status >= 0) wait(); }
+ } catch (InterruptedException ie) {
+ onInterruptedWait();
+ }
+ }
+
+ /**
+ * Performs the actual timed monitor wait for awaitDone.
+ */
+ private void doAwaitDone(long startTime, long nanos) {
+ synchronized (this) {
+ try {
+ while (status >= 0) {
+ long nt = nanos - (System.nanoTime() - startTime);
+ if (nt <= 0)
+ break;
+ wait(nt / 1000000, (int) (nt % 1000000));
+ }
+ } catch (InterruptedException ie) {
+ onInterruptedWait();
+ }
+ }
+ }
+
+ // Awaiting completion
+
+ /**
+ * Sets status to indicate there is joiner, then waits for join,
+ * surrounded with pool notifications.
+ *
+ * @return status upon exit
+ */
+ private int awaitDone(ForkJoinWorkerThread w,
+ boolean maintainParallelism) {
+ ForkJoinPool pool = (w == null) ? null : w.pool;
+ int s;
+ while ((s = status) >= 0) {
+ if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
+ if (pool == null || !pool.preJoin(this, maintainParallelism))
+ doAwaitDone();
+ if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
+ adjustPoolCountsOnUnblock(pool);
+ break;
+ }
+ }
+ return s;
+ }
+
+ /**
+ * Timed version of awaitDone
+ *
+ * @return status upon exit
+ */
+ private int awaitDone(ForkJoinWorkerThread w, long nanos) {
+ ForkJoinPool pool = (w == null) ? null : w.pool;
+ int s;
+ while ((s = status) >= 0) {
+ if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
+ long startTime = System.nanoTime();
+ if (pool == null || !pool.preJoin(this, false))
+ doAwaitDone(startTime, nanos);
+ if ((s = status) >= 0) {
+ adjustPoolCountsOnCancelledWait(pool);
+ s = status;
+ }
+ if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
+ adjustPoolCountsOnUnblock(pool);
+ break;
+ }
+ }
+ return s;
+ }
+
+ /**
+ * Notifies pool that thread is unblocked. Called by signalled
+ * threads when woken by non-FJ threads (which is atypical).
+ */
+ private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
+ int s;
+ do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
+ if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
+ pool.updateRunningCount(s);
+ }
+
+ /**
+ * Notifies pool to adjust counts on cancelled or timed out wait.
+ */
+ private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
+ if (pool != null) {
+ int s;
+ while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
+ if (casStatus(s, s - 1)) {
+ pool.updateRunningCount(1);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles interruptions during waits.
+ */
+ private void onInterruptedWait() {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null)
+ Thread.currentThread().interrupt(); // re-interrupt
+ else if (w.isTerminating())
+ cancelIgnoringExceptions();
+ // else if FJworker, ignore interrupt
+ }
+
+ // Recording and reporting exceptions
+
+ private void setDoneExceptionally(Throwable rex) {
+ exceptionMap.put(this, rex);
+ setCompletion(EXCEPTIONAL);
+ }
+
+ /**
+ * Throws the exception associated with status s.
+ *
+ * @throws the exception
+ */
+ private void reportException(int s) {
+ if ((s &= COMPLETION_MASK) < NORMAL) {
+ if (s == CANCELLED)
+ throw new CancellationException();
+ else
+ rethrowException(exceptionMap.get(this));
+ }
+ }
+
+ /**
+ * Returns result or throws exception using j.u.c.Future conventions.
+ * Only call when {@code isDone} known to be true or thread known
+ * to be interrupted.
+ */
+ private V reportFutureResult()
+ throws InterruptedException, ExecutionException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ int s = status & COMPLETION_MASK;
+ if (s < NORMAL) {
+ Throwable ex;
+ if (s == CANCELLED)
+ throw new CancellationException();
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ throw new ExecutionException(ex);
+ }
+ return getRawResult();
+ }
+
+ /**
+ * Returns result or throws exception using j.u.c.Future conventions
+ * with timeouts.
+ */
+ private V reportTimedFutureResult()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ Throwable ex;
+ int s = status & COMPLETION_MASK;
+ if (s == NORMAL)
+ return getRawResult();
+ else if (s == CANCELLED)
+ throw new CancellationException();
+ else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ throw new ExecutionException(ex);
+ else
+ throw new TimeoutException();
+ }
+
+ // internal execution methods
+
+ /**
+ * Calls exec, recording completion, and rethrowing exception if
+ * encountered. Caller should normally check status before calling.
+ *
+ * @return true if completed normally
+ */
+ private boolean tryExec() {
+ try { // try block must contain only call to exec
+ if (!exec())
+ return false;
+ } catch (Throwable rex) {
+ setDoneExceptionally(rex);
+ rethrowException(rex);
+ return false; // not reached
+ }
+ setNormalCompletion();
+ return true;
+ }
+
+ /**
+ * Main execution method used by worker threads. Invokes
+ * base computation unless already complete.
+ */
+ final void quietlyExec() {
+ if (status >= 0) {
+ try {
+ if (!exec())
+ return;
+ } catch (Throwable rex) {
+ setDoneExceptionally(rex);
+ return;
+ }
+ setNormalCompletion();
+ }
+ }
+
+ /**
+ * Calls exec(), recording but not rethrowing exception.
+ * Caller should normally check status before calling.
+ *
+ * @return true if completed normally
+ */
+ private boolean tryQuietlyInvoke() {
+ try {
+ if (!exec())
+ return false;
+ } catch (Throwable rex) {
+ setDoneExceptionally(rex);
+ return false;
+ }
+ setNormalCompletion();
+ return true;
+ }
+
+ /**
+ * Cancels, ignoring any exceptions it throws.
+ */
+ final void cancelIgnoringExceptions() {
+ try {
+ cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+
+ /**
+ * Main implementation of helpJoin
+ */
+ private int busyJoin(ForkJoinWorkerThread w) {
+ int s;
+ ForkJoinTask<?> t;
+ while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
+ t.quietlyExec();
+ return (s >= 0) ? awaitDone(w, false) : s; // block if no work
+ }
+
+ // public methods
+
+ /**
+ * Arranges to asynchronously execute this task. While it is not
+ * necessarily enforced, it is a usage error to fork a task more
+ * than once unless it has completed and been reinitialized.
+ * Subsequent modifications to the state of this task or any data
+ * it operates on are not necessarily consistently observable by
+ * any thread other than the one executing it unless preceded by a
+ * call to {@link #join} or related methods, or a call to {@link
+ * #isDone} returning {@code true}.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return {@code this}, to simplify usage
+ */
+ public final ForkJoinTask<V> fork() {
+ ((ForkJoinWorkerThread) Thread.currentThread())
+ .pushTask(this);
+ return this;
+ }
+
+ /**
+ * Returns the result of the computation when it {@link #isDone is done}.
+ * This method differs from {@link #get()} in that
+ * abnormal completion results in {@code RuntimeException} or
+ * {@code Error}, not {@code ExecutionException}.
+ *
+ * @return the computed result
+ */
+ public final V join() {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
+ reportException(awaitDone(w, true));
+ return getRawResult();
+ }
+
+ /**
+ * Commences performing this task, awaits its completion if
+ * necessary, and return its result, or throws an (unchecked)
+ * exception if the underlying computation did so.
+ *
+ * @return the computed result
+ */
+ public final V invoke() {
+ if (status >= 0 && tryExec())
+ return getRawResult();
+ else
+ return join();
+ }
+
+ /**
+ * Forks the given tasks, returning when {@code isDone} holds for
+ * each task or an (unchecked) exception is encountered, in which
+ * case the exception is rethrown. If either task encounters an
+ * exception, the other one may be, but is not guaranteed to be,
+ * cancelled. If both tasks throw an exception, then this method
+ * throws one of them. The individual status of each task may be
+ * checked using {@link #getException()} and related methods.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @param t1 the first task
+ * @param t2 the second task
+ * @throws NullPointerException if any task is null
+ */
+ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
+ t2.fork();
+ t1.invoke();
+ t2.join();
+ }
+
+ /**
+ * Forks the given tasks, returning when {@code isDone} holds for
+ * each task or an (unchecked) exception is encountered, in which
+ * case the exception is rethrown. If any task encounters an
+ * exception, others may be, but are not guaranteed to be,
+ * cancelled. If more than one task encounters an exception, then
+ * this method throws any one of these exceptions. The individual
+ * status of each task may be checked using {@link #getException()}
+ * and related methods.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @param tasks the tasks
+ * @throws NullPointerException if any task is null
+ */
+ public static void invokeAll(ForkJoinTask<?>... tasks) {
+ Throwable ex = null;
+ int last = tasks.length - 1;
+ for (int i = last; i >= 0; --i) {
+ ForkJoinTask<?> t = tasks[i];
+ if (t == null) {
+ if (ex == null)
+ ex = new NullPointerException();
+ }
+ else if (i != 0)
+ t.fork();
+ else {
+ t.quietlyInvoke();
+ if (ex == null)
+ ex = t.getException();
+ }
+ }
+ for (int i = 1; i <= last; ++i) {
+ ForkJoinTask<?> t = tasks[i];
+ if (t != null) {
+ if (ex != null)
+ t.cancel(false);
+ else {
+ t.quietlyJoin();
+ if (ex == null)
+ ex = t.getException();
+ }
+ }
+ }
+ if (ex != null)
+ rethrowException(ex);
+ }
+
+ /**
+ * Forks all tasks in the specified collection, returning when
+ * {@code isDone} holds for each task or an (unchecked) exception
+ * is encountered. If any task encounters an exception, others
+ * may be, but are not guaranteed to be, cancelled. If more than
+ * one task encounters an exception, then this method throws any
+ * one of these exceptions. The individual status of each task
+ * may be checked using {@link #getException()} and related
+ * methods. The behavior of this operation is undefined if the
+ * specified collection is modified while the operation is in
+ * progress.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @param tasks the collection of tasks
+ * @return the tasks argument, to simplify usage
+ * @throws NullPointerException if tasks or any element are null
+ */
+ public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
+ if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
+ invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
+ return tasks;
+ }
+ @SuppressWarnings("unchecked")
+ List<? extends ForkJoinTask<?>> ts =
+ (List<? extends ForkJoinTask<?>>) tasks;
+ Throwable ex = null;
+ int last = ts.size() - 1;
+ for (int i = last; i >= 0; --i) {
+ ForkJoinTask<?> t = ts.get(i);
+ if (t == null) {
+ if (ex == null)
+ ex = new NullPointerException();
+ }
+ else if (i != 0)
+ t.fork();
+ else {
+ t.quietlyInvoke();
+ if (ex == null)
+ ex = t.getException();
+ }
+ }
+ for (int i = 1; i <= last; ++i) {
+ ForkJoinTask<?> t = ts.get(i);
+ if (t != null) {
+ if (ex != null)
+ t.cancel(false);
+ else {
+ t.quietlyJoin();
+ if (ex == null)
+ ex = t.getException();
+ }
+ }
+ }
+ if (ex != null)
+ rethrowException(ex);
+ return tasks;
+ }
+
+ /**
+ * Attempts to cancel execution of this task. This attempt will
+ * fail if the task has already completed, has already been
+ * cancelled, or could not be cancelled for some other reason. If
+ * successful, and this task has not started when cancel is
+ * called, execution of this task is suppressed, {@link
+ * #isCancelled} will report true, and {@link #join} will result
+ * in a {@code CancellationException} being thrown.
+ *
+ * <p>This method may be overridden in subclasses, but if so, must
+ * still ensure that these minimal properties hold. In particular,
+ * the {@code cancel} method itself must not throw exceptions.
+ *
+ * <p>This method is designed to be invoked by <em>other</em>
+ * tasks. To terminate the current task, you can just return or
+ * throw an unchecked exception from its computation method, or
+ * invoke {@link #completeExceptionally}.
+ *
+ * @param mayInterruptIfRunning this value is ignored in the
+ * default implementation because tasks are not
+ * cancelled via interruption
+ *
+ * @return {@code true} if this task is now cancelled
+ */
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ setCompletion(CANCELLED);
+ return (status & COMPLETION_MASK) == CANCELLED;
+ }
+
+ public final boolean isDone() {
+ return status < 0;
+ }
+
+ public final boolean isCancelled() {
+ return (status & COMPLETION_MASK) == CANCELLED;
+ }
+
+ /**
+ * Returns {@code true} if this task threw an exception or was cancelled.
+ *
+ * @return {@code true} if this task threw an exception or was cancelled
+ */
+ public final boolean isCompletedAbnormally() {
+ return (status & COMPLETION_MASK) < NORMAL;
+ }
+
+ /**
+ * Returns {@code true} if this task completed without throwing an
+ * exception and was not cancelled.
+ *
+ * @return {@code true} if this task completed without throwing an
+ * exception and was not cancelled
+ */
+ public final boolean isCompletedNormally() {
+ return (status & COMPLETION_MASK) == NORMAL;
+ }
+
+ /**
+ * Returns the exception thrown by the base computation, or a
+ * {@code CancellationException} if cancelled, or {@code null} if
+ * none or if the method has not yet completed.
+ *
+ * @return the exception, or {@code null} if none
+ */
+ public final Throwable getException() {
+ int s = status & COMPLETION_MASK;
+ return ((s >= NORMAL) ? null :
+ (s == CANCELLED) ? new CancellationException() :
+ exceptionMap.get(this));
+ }
+
+ /**
+ * Completes this task abnormally, and if not already aborted or
+ * cancelled, causes it to throw the given exception upon
+ * {@code join} and related operations. This method may be used
+ * to induce exceptions in asynchronous tasks, or to force
+ * completion of tasks that would not otherwise complete. Its use
+ * in other situations is discouraged. This method is
+ * overridable, but overridden versions must invoke {@code super}
+ * implementation to maintain guarantees.
+ *
+ * @param ex the exception to throw. If this exception is not a
+ * {@code RuntimeException} or {@code Error}, the actual exception
+ * thrown will be a {@code RuntimeException} with cause {@code ex}.
+ */
+ public void completeExceptionally(Throwable ex) {
+ setDoneExceptionally((ex instanceof RuntimeException) ||
+ (ex instanceof Error) ? ex :
+ new RuntimeException(ex));
+ }
+
+ /**
+ * Completes this task, and if not already aborted or cancelled,
+ * returning a {@code null} result upon {@code join} and related
+ * operations. This method may be used to provide results for
+ * asynchronous tasks, or to provide alternative handling for
+ * tasks that would not otherwise complete normally. Its use in
+ * other situations is discouraged. This method is
+ * overridable, but overridden versions must invoke {@code super}
+ * implementation to maintain guarantees.
+ *
+ * @param value the result value for this task
+ */
+ public void complete(V value) {
+ try {
+ setRawResult(value);
+ } catch (Throwable rex) {
+ setDoneExceptionally(rex);
+ return;
+ }
+ setNormalCompletion();
+ }
+
+ public final V get() throws InterruptedException, ExecutionException {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
+ awaitDone(w, true);
+ return reportFutureResult();
+ }
+
+ public final V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ long nanos = unit.toNanos(timeout);
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
+ awaitDone(w, nanos);
+ return reportTimedFutureResult();
+ }
+
+ /**
+ * Possibly executes other tasks until this task {@link #isDone is
+ * done}, then returns the result of the computation. This method
+ * may be more efficient than {@code join}, but is only applicable
+ * when there are no potential dependencies between continuation
+ * of the current task and that of any other task that might be
+ * executed while helping. (This usually holds for pure
+ * divide-and-conquer tasks).
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return the computed result
+ */
+ public final V helpJoin() {
+ ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
+ if (status < 0 || !w.unpushTask(this) || !tryExec())
+ reportException(busyJoin(w));
+ return getRawResult();
+ }
+
+ /**
+ * Possibly executes other tasks until this task {@link #isDone is
+ * done}. This method may be useful when processing collections
+ * of tasks when some have been cancelled or otherwise known to
+ * have aborted.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ */
+ public final void quietlyHelpJoin() {
+ if (status >= 0) {
+ ForkJoinWorkerThread w =
+ (ForkJoinWorkerThread) Thread.currentThread();
+ if (!w.unpushTask(this) || !tryQuietlyInvoke())
+ busyJoin(w);
+ }
+ }
+
+ /**
+ * Joins this task, without returning its result or throwing an
+ * exception. This method may be useful when processing
+ * collections of tasks when some have been cancelled or otherwise
+ * known to have aborted.
+ */
+ public final void quietlyJoin() {
+ if (status >= 0) {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
+ awaitDone(w, true);
+ }
+ }
+
+ /**
+ * Commences performing this task and awaits its completion if
+ * necessary, without returning its result or throwing an
+ * exception. This method may be useful when processing
+ * collections of tasks when some have been cancelled or otherwise
+ * known to have aborted.
+ */
+ public final void quietlyInvoke() {
+ if (status >= 0 && !tryQuietlyInvoke())
+ quietlyJoin();
+ }
+
+ /**
+ * Possibly executes tasks until the pool hosting the current task
+ * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
+ * be of use in designs in which many tasks are forked, but none
+ * are explicitly joined, instead executing them until all are
+ * processed.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ */
+ public static void helpQuiesce() {
+ ((ForkJoinWorkerThread) Thread.currentThread())
+ .helpQuiescePool();
+ }
+
+ /**
+ * Resets the internal bookkeeping state of this task, allowing a
+ * subsequent {@code fork}. This method allows repeated reuse of
+ * this task, but only if reuse occurs when this task has either
+ * never been forked, or has been forked, then completed and all
+ * outstanding joins of this task have also completed. Effects
+ * under any other usage conditions are not guaranteed.
+ * This method may be useful when executing
+ * pre-constructed trees of subtasks in loops.
+ */
+ public void reinitialize() {
+ if ((status & COMPLETION_MASK) == EXCEPTIONAL)
+ exceptionMap.remove(this);
+ status = 0;
+ }
+
+ /**
+ * Returns the pool hosting the current task execution, or null
+ * if this task is executing outside of any ForkJoinPool.
+ *
+ * @see #inForkJoinPool
+ * @return the pool, or {@code null} if none
+ */
+ public static ForkJoinPool getPool() {
+ Thread t = Thread.currentThread();
+ return (t instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread) t).pool : null;
+ }
+
+ /**
+ * Returns {@code true} if the current thread is executing as a
+ * ForkJoinPool computation.
+ *
+ * @return {@code true} if the current thread is executing as a
+ * ForkJoinPool computation, or false otherwise
+ */
+ public static boolean inForkJoinPool() {
+ return Thread.currentThread() instanceof ForkJoinWorkerThread;
+ }
+
+ /**
+ * Tries to unschedule this task for execution. This method will
+ * typically succeed if this task is the most recently forked task
+ * by the current thread, and has not commenced executing in
+ * another thread. This method may be useful when arranging
+ * alternative local processing of tasks that could have been, but
+ * were not, stolen.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return {@code true} if unforked
+ */
+ public boolean tryUnfork() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .unpushTask(this);
+ }
+
+ /**
+ * Returns an estimate of the number of tasks that have been
+ * forked by the current worker thread but not yet executed. This
+ * value may be useful for heuristic decisions about whether to
+ * fork other tasks.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return the number of tasks
+ */
+ public static int getQueuedTaskCount() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .getQueueSize();
+ }
+
+ /**
+ * Returns an estimate of how many more locally queued tasks are
+ * held by the current worker thread than there are other worker
+ * threads that might steal them. This value may be useful for
+ * heuristic decisions about whether to fork other tasks. In many
+ * usages of ForkJoinTasks, at steady state, each worker should
+ * aim to maintain a small constant surplus (for example, 3) of
+ * tasks, and to process computations locally if this threshold is
+ * exceeded.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return the surplus number of tasks, which may be negative
+ */
+ public static int getSurplusQueuedTaskCount() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .getEstimatedSurplusTaskCount();
+ }
+
+ // Extension methods
+
+ /**
+ * Returns the result that would be returned by {@link #join}, even
+ * if this task completed abnormally, or {@code null} if this task
+ * is not known to have been completed. This method is designed
+ * to aid debugging, as well as to support extensions. Its use in
+ * any other context is discouraged.
+ *
+ * @return the result, or {@code null} if not completed
+ */
+ public abstract V getRawResult();
+
+ /**
+ * Forces the given value to be returned as a result. This method
+ * is designed to support extensions, and should not in general be
+ * called otherwise.
+ *
+ * @param value the value
+ */
+ protected abstract void setRawResult(V value);
+
+ /**
+ * Immediately performs the base action of this task. This method
+ * is designed to support extensions, and should not in general be
+ * called otherwise. The return value controls whether this task
+ * is considered to be done normally. It may return false in
+ * asynchronous actions that require explicit invocations of
+ * {@link #complete} to become joinable. It may also throw an
+ * (unchecked) exception to indicate abnormal exit.
+ *
+ * @return {@code true} if completed normally
+ */
+ protected abstract boolean exec();
+
+ /**
+ * Returns, but does not unschedule or execute, a task queued by
+ * the current thread but not yet executed, if one is immediately
+ * available. There is no guarantee that this task will actually
+ * be polled or executed next. Conversely, this method may return
+ * null even if a task exists but cannot be accessed without
+ * contention with other threads. This method is designed
+ * primarily to support extensions, and is unlikely to be useful
+ * otherwise.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return the next task, or {@code null} if none are available
+ */
+ protected static ForkJoinTask<?> peekNextLocalTask() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .peekTask();
+ }
+
+ /**
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed. This method
+ * is designed primarily to support extensions, and is unlikely to
+ * be useful otherwise.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return the next task, or {@code null} if none are available
+ */
+ protected static ForkJoinTask<?> pollNextLocalTask() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .pollLocalTask();
+ }
+
+ /**
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed, if one is
+ * available, or if not available, a task that was forked by some
+ * other thread, if available. Availability may be transient, so a
+ * {@code null} result does not necessarily imply quiescence
+ * of the pool this task is operating in. This method is designed
+ * primarily to support extensions, and is unlikely to be useful
+ * otherwise.
+ *
+ * <p>This method may be invoked only from within {@code
+ * ForkJoinTask} computations (as may be determined using method
+ * {@link #inForkJoinPool}). Attempts to invoke in other contexts
+ * result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
+ *
+ * @return a task, or {@code null} if none are available
+ */
+ protected static ForkJoinTask<?> pollTask() {
+ return ((ForkJoinWorkerThread) Thread.currentThread())
+ .pollTask();
+ }
+
+ /**
+ * Adaptor for Runnables. This implements RunnableFuture
+ * to be compliant with AbstractExecutorService constraints
+ * when used in ForkJoinPool.
+ */
+ static final class AdaptedRunnable<T> extends ForkJoinTask<T>
+ implements RunnableFuture<T> {
+ final Runnable runnable;
+ final T resultOnCompletion;
+ T result;
+ AdaptedRunnable(Runnable runnable, T result) {
+ if (runnable == null) throw new NullPointerException();
+ this.runnable = runnable;
+ this.resultOnCompletion = result;
+ }
+ public T getRawResult() { return result; }
+ public void setRawResult(T v) { result = v; }
+ public boolean exec() {
+ runnable.run();
+ result = resultOnCompletion;
+ return true;
+ }
+ public void run() { invoke(); }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
+ /**
+ * Adaptor for Callables
+ */
+ static final class AdaptedCallable<T> extends ForkJoinTask<T>
+ implements RunnableFuture<T> {
+ final Callable<? extends T> callable;
+ T result;
+ AdaptedCallable(Callable<? extends T> callable) {
+ if (callable == null) throw new NullPointerException();
+ this.callable = callable;
+ }
+ public T getRawResult() { return result; }
+ public void setRawResult(T v) { result = v; }
+ public boolean exec() {
+ try {
+ result = callable.call();
+ return true;
+ } catch (Error err) {
+ throw err;
+ } catch (RuntimeException rex) {
+ throw rex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ public void run() { invoke(); }
+ private static final long serialVersionUID = 2838392045355241008L;
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code run}
+ * method of the given {@code Runnable} as its action, and returns
+ * a null result upon {@link #join}.
+ *
+ * @param runnable the runnable action
+ * @return the task
+ */
+ public static ForkJoinTask<?> adapt(Runnable runnable) {
+ return new AdaptedRunnable<Void>(runnable, null);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code run}
+ * method of the given {@code Runnable} as its action, and returns
+ * the given result upon {@link #join}.
+ *
+ * @param runnable the runnable action
+ * @param result the result upon completion
+ * @return the task
+ */
+ public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
+ return new AdaptedRunnable<T>(runnable, result);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code call}
+ * method of the given {@code Callable} as its action, and returns
+ * its result upon {@link #join}, translating any checked exceptions
+ * encountered into {@code RuntimeException}.
+ *
+ * @param callable the callable action
+ * @return the task
+ */
+ public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
+ return new AdaptedCallable<T>(callable);
+ }
+
+ // Serialization support
+
+ private static final long serialVersionUID = -7721805057305804111L;
+
+ /**
+ * Saves the state to a stream.
+ *
+ * @serialData the current run status and the exception thrown
+ * during execution, or {@code null} if none
+ * @param s the stream
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ s.defaultWriteObject();
+ s.writeObject(getException());
+ }
+
+ /**
+ * Reconstitutes the instance from a stream.
+ *
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
+ status |= EXTERNAL_SIGNAL; // conservatively set external signal
+ Object ex = s.readObject();
+ if (ex != null)
+ setDoneExceptionally((Throwable) ex);
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long statusOffset =
+ objectFieldOffset("status", ForkJoinTask.class);
+
+ private static long objectFieldOffset(String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,827 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.Collection;
+
+/**
+ * A thread managed by a {@link ForkJoinPool}. This class is
+ * subclassable solely for the sake of adding functionality -- there
+ * are no overridable methods dealing with scheduling or execution.
+ * However, you can override initialization and termination methods
+ * surrounding the main task processing loop. If you do create such a
+ * subclass, you will also need to supply a custom {@link
+ * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
+ * ForkJoinPool}.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public class ForkJoinWorkerThread extends Thread {
+ /*
+ * Algorithm overview:
+ *
+ * 1. Work-Stealing: Work-stealing queues are special forms of
+ * Deques that support only three of the four possible
+ * end-operations -- push, pop, and deq (aka steal), and only do
+ * so under the constraints that push and pop are called only from
+ * the owning thread, while deq may be called from other threads.
+ * (If you are unfamiliar with them, you probably want to read
+ * Herlihy and Shavit's book "The Art of Multiprocessor
+ * programming", chapter 16 describing these in more detail before
+ * proceeding.) The main work-stealing queue design is roughly
+ * similar to "Dynamic Circular Work-Stealing Deque" by David
+ * Chase and Yossi Lev, SPAA 2005
+ * (http://research.sun.com/scalable/pubs/index.html). The main
+ * difference ultimately stems from gc requirements that we null
+ * out taken slots as soon as we can, to maintain as small a
+ * footprint as possible even in programs generating huge numbers
+ * of tasks. To accomplish this, we shift the CAS arbitrating pop
+ * vs deq (steal) from being on the indices ("base" and "sp") to
+ * the slots themselves (mainly via method "casSlotNull()"). So,
+ * both a successful pop and deq mainly entail CAS'ing a non-null
+ * slot to null. Because we rely on CASes of references, we do
+ * not need tag bits on base or sp. They are simple ints as used
+ * in any circular array-based queue (see for example ArrayDeque).
+ * Updates to the indices must still be ordered in a way that
+ * guarantees that (sp - base) > 0 means the queue is empty, but
+ * otherwise may err on the side of possibly making the queue
+ * appear nonempty when a push, pop, or deq have not fully
+ * committed. Note that this means that the deq operation,
+ * considered individually, is not wait-free. One thief cannot
+ * successfully continue until another in-progress one (or, if
+ * previously empty, a push) completes. However, in the
+ * aggregate, we ensure at least probabilistic
+ * non-blockingness. If an attempted steal fails, a thief always
+ * chooses a different random victim target to try next. So, in
+ * order for one thief to progress, it suffices for any
+ * in-progress deq or new push on any empty queue to complete. One
+ * reason this works well here is that apparently-nonempty often
+ * means soon-to-be-stealable, which gives threads a chance to
+ * activate if necessary before stealing (see below).
+ *
+ * This approach also enables support for "async mode" where local
+ * task processing is in FIFO, not LIFO order; simply by using a
+ * version of deq rather than pop when locallyFifo is true (as set
+ * by the ForkJoinPool). This allows use in message-passing
+ * frameworks in which tasks are never joined.
+ *
+ * Efficient implementation of this approach currently relies on
+ * an uncomfortable amount of "Unsafe" mechanics. To maintain
+ * correct orderings, reads and writes of variable base require
+ * volatile ordering. Variable sp does not require volatile write
+ * but needs cheaper store-ordering on writes. Because they are
+ * protected by volatile base reads, reads of the queue array and
+ * its slots do not need volatile load semantics, but writes (in
+ * push) require store order and CASes (in pop and deq) require
+ * (volatile) CAS semantics. (See "Idempotent work stealing" by
+ * Michael, Saraswat, and Vechev, PPoPP 2009
+ * http://portal.acm.org/citation.cfm?id=1504186 for an algorithm
+ * with similar properties, but without support for nulling
+ * slots.) Since these combinations aren't supported using
+ * ordinary volatiles, the only way to accomplish these
+ * efficiently is to use direct Unsafe calls. (Using external
+ * AtomicIntegers and AtomicReferenceArrays for the indices and
+ * array is significantly slower because of memory locality and
+ * indirection effects.)
+ *
+ * Further, performance on most platforms is very sensitive to
+ * placement and sizing of the (resizable) queue array. Even
+ * though these queues don't usually become all that big, the
+ * initial size must be large enough to counteract cache
+ * contention effects across multiple queues (especially in the
+ * presence of GC cardmarking). Also, to improve thread-locality,
+ * queues are currently initialized immediately after the thread
+ * gets the initial signal to start processing tasks. However,
+ * all queue-related methods except pushTask are written in a way
+ * that allows them to instead be lazily allocated and/or disposed
+ * of when empty. All together, these low-level implementation
+ * choices produce as much as a factor of 4 performance
+ * improvement compared to naive implementations, and enable the
+ * processing of billions of tasks per second, sometimes at the
+ * expense of ugliness.
+ *
+ * 2. Run control: The primary run control is based on a global
+ * counter (activeCount) held by the pool. It uses an algorithm
+ * similar to that in Herlihy and Shavit section 17.6 to cause
+ * threads to eventually block when all threads declare they are
+ * inactive. For this to work, threads must be declared active
+ * when executing tasks, and before stealing a task. They must be
+ * inactive before blocking on the Pool Barrier (awaiting a new
+ * submission or other Pool event). In between, there is some free
+ * play which we take advantage of to avoid contention and rapid
+ * flickering of the global activeCount: If inactive, we activate
+ * only if a victim queue appears to be nonempty (see above).
+ * Similarly, a thread tries to inactivate only after a full scan
+ * of other threads. The net effect is that contention on
+ * activeCount is rarely a measurable performance issue. (There
+ * are also a few other cases where we scan for work rather than
+ * retry/block upon contention.)
+ *
+ * 3. Selection control. We maintain policy of always choosing to
+ * run local tasks rather than stealing, and always trying to
+ * steal tasks before trying to run a new submission. All steals
+ * are currently performed in randomly-chosen deq-order. It may be
+ * worthwhile to bias these with locality / anti-locality
+ * information, but doing this well probably requires more
+ * lower-level information from JVMs than currently provided.
+ */
+
+ /**
+ * Capacity of work-stealing queue array upon initialization.
+ * Must be a power of two. Initial size must be at least 2, but is
+ * padded to minimize cache effects.
+ */
+ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
+
+ /**
+ * Maximum work-stealing queue array size. Must be less than or
+ * equal to 1 << 28 to ensure lack of index wraparound. (This
+ * is less than usual bounds, because we need leftshift by 3
+ * to be in int range).
+ */
+ private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
+
+ /**
+ * The pool this thread works in. Accessed directly by ForkJoinTask.
+ */
+ final ForkJoinPool pool;
+
+ /**
+ * The work-stealing queue array. Size must be a power of two.
+ * Initialized when thread starts, to improve memory locality.
+ */
+ private ForkJoinTask<?>[] queue;
+
+ /**
+ * Index (mod queue.length) of next queue slot to push to or pop
+ * from. It is written only by owner thread, via ordered store.
+ * Both sp and base are allowed to wrap around on overflow, but
+ * (sp - base) still estimates size.
+ */
+ private volatile int sp;
+
+ /**
+ * Index (mod queue.length) of least valid queue slot, which is
+ * always the next position to steal from if nonempty.
+ */
+ private volatile int base;
+
+ /**
+ * Activity status. When true, this worker is considered active.
+ * Must be false upon construction. It must be true when executing
+ * tasks, and BEFORE stealing a task. It must be false before
+ * calling pool.sync.
+ */
+ private boolean active;
+
+ /**
+ * Run state of this worker. Supports simple versions of the usual
+ * shutdown/shutdownNow control.
+ */
+ private volatile int runState;
+
+ /**
+ * Seed for random number generator for choosing steal victims.
+ * Uses Marsaglia xorshift. Must be nonzero upon initialization.
+ */
+ private int seed;
+
+ /**
+ * Number of steals, transferred to pool when idle
+ */
+ private int stealCount;
+
+ /**
+ * Index of this worker in pool array. Set once by pool before
+ * running, and accessed directly by pool during cleanup etc.
+ */
+ int poolIndex;
+
+ /**
+ * The last barrier event waited for. Accessed in pool callback
+ * methods, but only by current thread.
+ */
+ long lastEventCount;
+
+ /**
+ * True if use local fifo, not default lifo, for local polling
+ */
+ private boolean locallyFifo;
+
+ /**
+ * Creates a ForkJoinWorkerThread operating in the given pool.
+ *
+ * @param pool the pool this thread works in
+ * @throws NullPointerException if pool is null
+ */
+ protected ForkJoinWorkerThread(ForkJoinPool pool) {
+ if (pool == null) throw new NullPointerException();
+ this.pool = pool;
+ // Note: poolIndex is set by pool during construction
+ // Remaining initialization is deferred to onStart
+ }
+
+ // Public access methods
+
+ /**
+ * Returns the pool hosting this thread.
+ *
+ * @return the pool
+ */
+ public ForkJoinPool getPool() {
+ return pool;
+ }
+
+ /**
+ * Returns the index number of this thread in its pool. The
+ * returned value ranges from zero to the maximum number of
+ * threads (minus one) that have ever been created in the pool.
+ * This method may be useful for applications that track status or
+ * collect results per-worker rather than per-task.
+ *
+ * @return the index number
+ */
+ public int getPoolIndex() {
+ return poolIndex;
+ }
+
+ /**
+ * Establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined.
+ *
+ * @param async if true, use locally FIFO scheduling
+ */
+ void setAsyncMode(boolean async) {
+ locallyFifo = async;
+ }
+
+ // Runstate management
+
+ // Runstate values. Order matters
+ private static final int RUNNING = 0;
+ private static final int SHUTDOWN = 1;
+ private static final int TERMINATING = 2;
+ private static final int TERMINATED = 3;
+
+ final boolean isShutdown() { return runState >= SHUTDOWN; }
+ final boolean isTerminating() { return runState >= TERMINATING; }
+ final boolean isTerminated() { return runState == TERMINATED; }
+ final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
+ final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
+
+ /**
+ * Transitions to at least the given state.
+ *
+ * @return {@code true} if not already at least at given state
+ */
+ private boolean transitionRunStateTo(int state) {
+ for (;;) {
+ int s = runState;
+ if (s >= state)
+ return false;
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
+ return true;
+ }
+ }
+
+ /**
+ * Tries to set status to active; fails on contention.
+ */
+ private boolean tryActivate() {
+ if (!active) {
+ if (!pool.tryIncrementActiveCount())
+ return false;
+ active = true;
+ }
+ return true;
+ }
+
+ /**
+ * Tries to set status to inactive; fails on contention.
+ */
+ private boolean tryInactivate() {
+ if (active) {
+ if (!pool.tryDecrementActiveCount())
+ return false;
+ active = false;
+ }
+ return true;
+ }
+
+ /**
+ * Computes next value for random victim probe. Scans don't
+ * require a very high quality generator, but also not a crummy
+ * one. Marsaglia xor-shift is cheap and works well.
+ */
+ private static int xorShift(int r) {
+ r ^= (r << 13);
+ r ^= (r >>> 17);
+ return r ^ (r << 5);
+ }
+
+ // Lifecycle methods
+
+ /**
+ * This method is required to be public, but should never be
+ * called explicitly. It performs the main run loop to execute
+ * ForkJoinTasks.
+ */
+ public void run() {
+ Throwable exception = null;
+ try {
+ onStart();
+ pool.sync(this); // await first pool event
+ mainLoop();
+ } catch (Throwable ex) {
+ exception = ex;
+ } finally {
+ onTermination(exception);
+ }
+ }
+
+ /**
+ * Executes tasks until shut down.
+ */
+ private void mainLoop() {
+ while (!isShutdown()) {
+ ForkJoinTask<?> t = pollTask();
+ if (t != null || (t = pollSubmission()) != null)
+ t.quietlyExec();
+ else if (tryInactivate())
+ pool.sync(this);
+ }
+ }
+
+ /**
+ * Initializes internal state after construction but before
+ * processing any tasks. If you override this method, you must
+ * invoke super.onStart() at the beginning of the method.
+ * Initialization requires care: Most fields must have legal
+ * default values, to ensure that attempted accesses from other
+ * threads work correctly even before this thread starts
+ * processing tasks.
+ */
+ protected void onStart() {
+ // Allocate while starting to improve chances of thread-local
+ // isolation
+ queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
+ // Initial value of seed need not be especially random but
+ // should differ across workers and must be nonzero
+ int p = poolIndex + 1;
+ seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
+ }
+
+ /**
+ * Performs cleanup associated with termination of this worker
+ * thread. If you override this method, you must invoke
+ * {@code super.onTermination} at the end of the overridden method.
+ *
+ * @param exception the exception causing this thread to abort due
+ * to an unrecoverable error, or {@code null} if completed normally
+ */
+ protected void onTermination(Throwable exception) {
+ // Execute remaining local tasks unless aborting or terminating
+ while (exception == null && pool.isProcessingTasks() && base != sp) {
+ try {
+ ForkJoinTask<?> t = popTask();
+ if (t != null)
+ t.quietlyExec();
+ } catch (Throwable ex) {
+ exception = ex;
+ }
+ }
+ // Cancel other tasks, transition status, notify pool, and
+ // propagate exception to uncaught exception handler
+ try {
+ do {} while (!tryInactivate()); // ensure inactive
+ cancelTasks();
+ runState = TERMINATED;
+ pool.workerTerminated(this);
+ } catch (Throwable ex) { // Shouldn't ever happen
+ if (exception == null) // but if so, at least rethrown
+ exception = ex;
+ } finally {
+ if (exception != null)
+ ForkJoinTask.rethrowException(exception);
+ }
+ }
+
+ // Intrinsics-based support for queue operations.
+
+ private static long slotOffset(int i) {
+ return ((long) i << qShift) + qBase;
+ }
+
+ /**
+ * Adds in store-order the given task at given slot of q to null.
+ * Caller must ensure q is non-null and index is in range.
+ */
+ private static void setSlot(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t) {
+ UNSAFE.putOrderedObject(q, slotOffset(i), t);
+ }
+
+ /**
+ * CAS given slot of q to null. Caller must ensure q is non-null
+ * and index is in range.
+ */
+ private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t) {
+ return UNSAFE.compareAndSwapObject(q, slotOffset(i), t, null);
+ }
+
+ /**
+ * Sets sp in store-order.
+ */
+ private void storeSp(int s) {
+ UNSAFE.putOrderedInt(this, spOffset, s);
+ }
+
+ // Main queue methods
+
+ /**
+ * Pushes a task. Called only by current thread.
+ *
+ * @param t the task. Caller must ensure non-null.
+ */
+ final void pushTask(ForkJoinTask<?> t) {
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int s = sp;
+ setSlot(q, s & mask, t);
+ storeSp(++s);
+ if ((s -= base) == 1)
+ pool.signalWork();
+ else if (s >= mask)
+ growQueue();
+ }
+
+ /**
+ * Tries to take a task from the base of the queue, failing if
+ * either empty or contended.
+ *
+ * @return a task, or null if none or contended
+ */
+ final ForkJoinTask<?> deqTask() {
+ ForkJoinTask<?> t;
+ ForkJoinTask<?>[] q;
+ int i;
+ int b;
+ if (sp != (b = base) &&
+ (q = queue) != null && // must read q after b
+ (t = q[i = (q.length - 1) & b]) != null &&
+ casSlotNull(q, i, t)) {
+ base = b + 1;
+ return t;
+ }
+ return null;
+ }
+
+ /**
+ * Tries to take a task from the base of own queue, activating if
+ * necessary, failing only if empty. Called only by current thread.
+ *
+ * @return a task, or null if none
+ */
+ final ForkJoinTask<?> locallyDeqTask() {
+ int b;
+ while (sp != (b = base)) {
+ if (tryActivate()) {
+ ForkJoinTask<?>[] q = queue;
+ int i = (q.length - 1) & b;
+ ForkJoinTask<?> t = q[i];
+ if (t != null && casSlotNull(q, i, t)) {
+ base = b + 1;
+ return t;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns a popped task, or null if empty. Ensures active status
+ * if non-null. Called only by current thread.
+ */
+ final ForkJoinTask<?> popTask() {
+ int s = sp;
+ while (s != base) {
+ if (tryActivate()) {
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int i = (s - 1) & mask;
+ ForkJoinTask<?> t = q[i];
+ if (t == null || !casSlotNull(q, i, t))
+ break;
+ storeSp(s - 1);
+ return t;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Specialized version of popTask to pop only if
+ * topmost element is the given task. Called only
+ * by current thread while active.
+ *
+ * @param t the task. Caller must ensure non-null.
+ */
+ final boolean unpushTask(ForkJoinTask<?> t) {
+ ForkJoinTask<?>[] q = queue;
+ int mask = q.length - 1;
+ int s = sp - 1;
+ if (casSlotNull(q, s & mask, t)) {
+ storeSp(s);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns next task or null if empty or contended
+ */
+ final ForkJoinTask<?> peekTask() {
+ ForkJoinTask<?>[] q = queue;
+ if (q == null)
+ return null;
+ int mask = q.length - 1;
+ int i = locallyFifo ? base : (sp - 1);
+ return q[i & mask];
+ }
+
+ /**
+ * Doubles queue array size. Transfers elements by emulating
+ * steals (deqs) from old array and placing, oldest first, into
+ * new array.
+ */
+ private void growQueue() {
+ ForkJoinTask<?>[] oldQ = queue;
+ int oldSize = oldQ.length;
+ int newSize = oldSize << 1;
+ if (newSize > MAXIMUM_QUEUE_CAPACITY)
+ throw new RejectedExecutionException("Queue capacity exceeded");
+ ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
+
+ int b = base;
+ int bf = b + oldSize;
+ int oldMask = oldSize - 1;
+ int newMask = newSize - 1;
+ do {
+ int oldIndex = b & oldMask;
+ ForkJoinTask<?> t = oldQ[oldIndex];
+ if (t != null && !casSlotNull(oldQ, oldIndex, t))
+ t = null;
+ setSlot(newQ, b & newMask, t);
+ } while (++b != bf);
+ pool.signalWork();
+ }
+
+ /**
+ * Tries to steal a task from another worker. Starts at a random
+ * index of workers array, and probes workers until finding one
+ * with non-empty queue or finding that all are empty. It
+ * randomly selects the first n probes. If these are empty, it
+ * resorts to a full circular traversal, which is necessary to
+ * accurately set active status by caller. Also restarts if pool
+ * events occurred since last scan, which forces refresh of
+ * workers array, in case barrier was associated with resize.
+ *
+ * This method must be both fast and quiet -- usually avoiding
+ * memory accesses that could disrupt cache sharing etc other than
+ * those needed to check for and take tasks. This accounts for,
+ * among other things, updating random seed in place without
+ * storing it until exit.
+ *
+ * @return a task, or null if none found
+ */
+ private ForkJoinTask<?> scan() {
+ ForkJoinTask<?> t = null;
+ int r = seed; // extract once to keep scan quiet
+ ForkJoinWorkerThread[] ws; // refreshed on outer loop
+ int mask; // must be power 2 minus 1 and > 0
+ outer:do {
+ if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
+ int idx = r;
+ int probes = ~mask; // use random index while negative
+ for (;;) {
+ r = xorShift(r); // update random seed
+ ForkJoinWorkerThread v = ws[mask & idx];
+ if (v == null || v.sp == v.base) {
+ if (probes <= mask)
+ idx = (probes++ < 0) ? r : (idx + 1);
+ else
+ break;
+ }
+ else if (!tryActivate() || (t = v.deqTask()) == null)
+ continue outer; // restart on contention
+ else
+ break outer;
+ }
+ }
+ } while (pool.hasNewSyncEvent(this)); // retry on pool events
+ seed = r;
+ return t;
+ }
+
+ /**
+ * Gets and removes a local or stolen task.
+ *
+ * @return a task, if available
+ */
+ final ForkJoinTask<?> pollTask() {
+ ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
+ if (t == null && (t = scan()) != null)
+ ++stealCount;
+ return t;
+ }
+
+ /**
+ * Gets a local task.
+ *
+ * @return a task, if available
+ */
+ final ForkJoinTask<?> pollLocalTask() {
+ return locallyFifo ? locallyDeqTask() : popTask();
+ }
+
+ /**
+ * Returns a pool submission, if one exists, activating first.
+ *
+ * @return a submission, if available
+ */
+ private ForkJoinTask<?> pollSubmission() {
+ ForkJoinPool p = pool;
+ while (p.hasQueuedSubmissions()) {
+ ForkJoinTask<?> t;
+ if (tryActivate() && (t = p.pollSubmission()) != null)
+ return t;
+ }
+ return null;
+ }
+
+ // Methods accessed only by Pool
+
+ /**
+ * Removes and cancels all tasks in queue. Can be called from any
+ * thread.
+ */
+ final void cancelTasks() {
+ ForkJoinTask<?> t;
+ while (base != sp && (t = deqTask()) != null)
+ t.cancelIgnoringExceptions();
+ }
+
+ /**
+ * Drains tasks to given collection c.
+ *
+ * @return the number of tasks drained
+ */
+ final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
+ int n = 0;
+ ForkJoinTask<?> t;
+ while (base != sp && (t = deqTask()) != null) {
+ c.add(t);
+ ++n;
+ }
+ return n;
+ }
+
+ /**
+ * Gets and clears steal count for accumulation by pool. Called
+ * only when known to be idle (in pool.sync and termination).
+ */
+ final int getAndClearStealCount() {
+ int sc = stealCount;
+ stealCount = 0;
+ return sc;
+ }
+
+ /**
+ * Returns {@code true} if at least one worker in the given array
+ * appears to have at least one queued task.
+ *
+ * @param ws array of workers
+ */
+ static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
+ if (ws != null) {
+ int len = ws.length;
+ for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
+ for (int i = 0; i < len; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null && w.sp != w.base)
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ // Support methods for ForkJoinTask
+
+ /**
+ * Returns an estimate of the number of tasks in the queue.
+ */
+ final int getQueueSize() {
+ // suppress momentarily negative values
+ return Math.max(0, sp - base);
+ }
+
+ /**
+ * Returns an estimate of the number of tasks, offset by a
+ * function of number of idle workers.
+ */
+ final int getEstimatedSurplusTaskCount() {
+ // The halving approximates weighting idle vs non-idle workers
+ return (sp - base) - (pool.getIdleThreadCount() >>> 1);
+ }
+
+ /**
+ * Scans, returning early if joinMe done.
+ */
+ final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
+ ForkJoinTask<?> t = pollTask();
+ if (t != null && joinMe.status < 0 && sp == base) {
+ pushTask(t); // unsteal if done and this task would be stealable
+ t = null;
+ }
+ return t;
+ }
+
+ /**
+ * Runs tasks until {@code pool.isQuiescent()}.
+ */
+ final void helpQuiescePool() {
+ for (;;) {
+ ForkJoinTask<?> t = pollTask();
+ if (t != null)
+ t.quietlyExec();
+ else if (tryInactivate() && pool.isQuiescent())
+ break;
+ }
+ do {} while (!tryActivate()); // re-activate on exit
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long spOffset =
+ objectFieldOffset("sp", ForkJoinWorkerThread.class);
+ private static final long runStateOffset =
+ objectFieldOffset("runState", ForkJoinWorkerThread.class);
+ private static final long qBase;
+ private static final int qShift;
+
+ static {
+ qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
+ int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
+ if ((s & (s-1)) != 0)
+ throw new Error("data type scale not a power of two");
+ qShift = 31 - Integer.numberOfLeadingZeros(s);
+ }
+
+ private static long objectFieldOffset(String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,1270 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.locks.LockSupport;
+/**
+ * An unbounded {@link TransferQueue} based on linked nodes.
+ * This queue orders elements FIFO (first-in-first-out) with respect
+ * to any given producer. The <em>head</em> of the queue is that
+ * element that has been on the queue the longest time for some
+ * producer. The <em>tail</em> of the queue is that element that has
+ * been on the queue the shortest time for some producer.
+ *
+ * <p>Beware that, unlike in most collections, the {@code size}
+ * method is <em>NOT</em> a constant-time operation. Because of the
+ * asynchronous nature of these queues, determining the current number
+ * of elements requires a traversal of the elements.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.
+ *
+ * <p>Memory consistency effects: As with other concurrent
+ * collections, actions in a thread prior to placing an object into a
+ * {@code LinkedTransferQueue}
+ * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * actions subsequent to the access or removal of that element from
+ * the {@code LinkedTransferQueue} in another thread.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedTransferQueue<E> extends AbstractQueue<E>
+ implements TransferQueue<E>, java.io.Serializable {
+ private static final long serialVersionUID = -3223113410248163686L;
+
+ /*
+ * *** Overview of Dual Queues with Slack ***
+ *
+ * Dual Queues, introduced by Scherer and Scott
+ * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
+ * (linked) queues in which nodes may represent either data or
+ * requests. When a thread tries to enqueue a data node, but
+ * encounters a request node, it instead "matches" and removes it;
+ * and vice versa for enqueuing requests. Blocking Dual Queues
+ * arrange that threads enqueuing unmatched requests block until
+ * other threads provide the match. Dual Synchronous Queues (see
+ * Scherer, Lea, & Scott
+ * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
+ * additionally arrange that threads enqueuing unmatched data also
+ * block. Dual Transfer Queues support all of these modes, as
+ * dictated by callers.
+ *
+ * A FIFO dual queue may be implemented using a variation of the
+ * Michael & Scott (M&S) lock-free queue algorithm
+ * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
+ * It maintains two pointer fields, "head", pointing to a
+ * (matched) node that in turn points to the first actual
+ * (unmatched) queue node (or null if empty); and "tail" that
+ * points to the last node on the queue (or again null if
+ * empty). For example, here is a possible queue with four data
+ * elements:
+ *
+ * head tail
+ * | |
+ * v v
+ * M -> U -> U -> U -> U
+ *
+ * The M&S queue algorithm is known to be prone to scalability and
+ * overhead limitations when maintaining (via CAS) these head and
+ * tail pointers. This has led to the development of
+ * contention-reducing variants such as elimination arrays (see
+ * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
+ * optimistic back pointers (see Ladan-Mozes & Shavit
+ * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
+ * However, the nature of dual queues enables a simpler tactic for
+ * improving M&S-style implementations when dual-ness is needed.
+ *
+ * In a dual queue, each node must atomically maintain its match
+ * status. While there are other possible variants, we implement
+ * this here as: for a data-mode node, matching entails CASing an
+ * "item" field from a non-null data value to null upon match, and
+ * vice-versa for request nodes, CASing from null to a data
+ * value. (Note that the linearization properties of this style of
+ * queue are easy to verify -- elements are made available by
+ * linking, and unavailable by matching.) Compared to plain M&S
+ * queues, this property of dual queues requires one additional
+ * successful atomic operation per enq/deq pair. But it also
+ * enables lower cost variants of queue maintenance mechanics. (A
+ * variation of this idea applies even for non-dual queues that
+ * support deletion of interior elements, such as
+ * j.u.c.ConcurrentLinkedQueue.)
+ *
+ * Once a node is matched, its match status can never again
+ * change. We may thus arrange that the linked list of them
+ * contain a prefix of zero or more matched nodes, followed by a
+ * suffix of zero or more unmatched nodes. (Note that we allow
+ * both the prefix and suffix to be zero length, which in turn
+ * means that we do not use a dummy header.) If we were not
+ * concerned with either time or space efficiency, we could
+ * correctly perform enqueue and dequeue operations by traversing
+ * from a pointer to the initial node; CASing the item of the
+ * first unmatched node on match and CASing the next field of the
+ * trailing node on appends. (Plus some special-casing when
+ * initially empty). While this would be a terrible idea in
+ * itself, it does have the benefit of not requiring ANY atomic
+ * updates on head/tail fields.
+ *
+ * We introduce here an approach that lies between the extremes of
+ * never versus always updating queue (head and tail) pointers.
+ * This offers a tradeoff between sometimes requiring extra
+ * traversal steps to locate the first and/or last unmatched
+ * nodes, versus the reduced overhead and contention of fewer
+ * updates to queue pointers. For example, a possible snapshot of
+ * a queue is:
+ *
+ * head tail
+ * | |
+ * v v
+ * M -> M -> U -> U -> U -> U
+ *
+ * The best value for this "slack" (the targeted maximum distance
+ * between the value of "head" and the first unmatched node, and
+ * similarly for "tail") is an empirical matter. We have found
+ * that using very small constants in the range of 1-3 work best
+ * over a range of platforms. Larger values introduce increasing
+ * costs of cache misses and risks of long traversal chains, while
+ * smaller values increase CAS contention and overhead.
+ *
+ * Dual queues with slack differ from plain M&S dual queues by
+ * virtue of only sometimes updating head or tail pointers when
+ * matching, appending, or even traversing nodes; in order to
+ * maintain a targeted slack. The idea of "sometimes" may be
+ * operationalized in several ways. The simplest is to use a
+ * per-operation counter incremented on each traversal step, and
+ * to try (via CAS) to update the associated queue pointer
+ * whenever the count exceeds a threshold. Another, that requires
+ * more overhead, is to use random number generators to update
+ * with a given probability per traversal step.
+ *
+ * In any strategy along these lines, because CASes updating
+ * fields may fail, the actual slack may exceed targeted
+ * slack. However, they may be retried at any time to maintain
+ * targets. Even when using very small slack values, this
+ * approach works well for dual queues because it allows all
+ * operations up to the point of matching or appending an item
+ * (hence potentially allowing progress by another thread) to be
+ * read-only, thus not introducing any further contention. As
+ * described below, we implement this by performing slack
+ * maintenance retries only after these points.
+ *
+ * As an accompaniment to such techniques, traversal overhead can
+ * be further reduced without increasing contention of head
+ * pointer updates: Threads may sometimes shortcut the "next" link
+ * path from the current "head" node to be closer to the currently
+ * known first unmatched node, and similarly for tail. Again, this
+ * may be triggered with using thresholds or randomization.
+ *
+ * These ideas must be further extended to avoid unbounded amounts
+ * of costly-to-reclaim garbage caused by the sequential "next"
+ * links of nodes starting at old forgotten head nodes: As first
+ * described in detail by Boehm
+ * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
+ * delays noticing that any arbitrarily old node has become
+ * garbage, all newer dead nodes will also be unreclaimed.
+ * (Similar issues arise in non-GC environments.) To cope with
+ * this in our implementation, upon CASing to advance the head
+ * pointer, we set the "next" link of the previous head to point
+ * only to itself; thus limiting the length of connected dead lists.
+ * (We also take similar care to wipe out possibly garbage
+ * retaining values held in other Node fields.) However, doing so
+ * adds some further complexity to traversal: If any "next"
+ * pointer links to itself, it indicates that the current thread
+ * has lagged behind a head-update, and so the traversal must
+ * continue from the "head". Traversals trying to find the
+ * current tail starting from "tail" may also encounter
+ * self-links, in which case they also continue at "head".
+ *
+ * It is tempting in slack-based scheme to not even use CAS for
+ * updates (similarly to Ladan-Mozes & Shavit). However, this
+ * cannot be done for head updates under the above link-forgetting
+ * mechanics because an update may leave head at a detached node.
+ * And while direct writes are possible for tail updates, they
+ * increase the risk of long retraversals, and hence long garbage
+ * chains, which can be much more costly than is worthwhile
+ * considering that the cost difference of performing a CAS vs
+ * write is smaller when they are not triggered on each operation
+ * (especially considering that writes and CASes equally require
+ * additional GC bookkeeping ("write barriers") that are sometimes
+ * more costly than the writes themselves because of contention).
+ *
+ * Removal of interior nodes (due to timed out or interrupted
+ * waits, or calls to remove(x) or Iterator.remove) can use a
+ * scheme roughly similar to that described in Scherer, Lea, and
+ * Scott's SynchronousQueue. Given a predecessor, we can unsplice
+ * any node except the (actual) tail of the queue. To avoid
+ * build-up of cancelled trailing nodes, upon a request to remove
+ * a trailing node, it is placed in field "cleanMe" to be
+ * unspliced upon the next call to unsplice any other node.
+ * Situations needing such mechanics are not common but do occur
+ * in practice; for example when an unbounded series of short
+ * timed calls to poll repeatedly time out but never otherwise
+ * fall off the list because of an untimed call to take at the
+ * front of the queue. Note that maintaining field cleanMe does
+ * not otherwise much impact garbage retention even if never
+ * cleared by some other call because the held node will
+ * eventually either directly or indirectly lead to a self-link
+ * once off the list.
+ *
+ * *** Overview of implementation ***
+ *
+ * We use a threshold-based approach to updates, with a slack
+ * threshold of two -- that is, we update head/tail when the
+ * current pointer appears to be two or more steps away from the
+ * first/last node. The slack value is hard-wired: a path greater
+ * than one is naturally implemented by checking equality of
+ * traversal pointers except when the list has only one element,
+ * in which case we keep slack threshold at one. Avoiding tracking
+ * explicit counts across method calls slightly simplifies an
+ * already-messy implementation. Using randomization would
+ * probably work better if there were a low-quality dirt-cheap
+ * per-thread one available, but even ThreadLocalRandom is too
+ * heavy for these purposes.
+ *
+ * With such a small slack threshold value, it is rarely
+ * worthwhile to augment this with path short-circuiting; i.e.,
+ * unsplicing nodes between head and the first unmatched node, or
+ * similarly for tail, rather than advancing head or tail
+ * proper. However, it is used (in awaitMatch) immediately before
+ * a waiting thread starts to block, as a final bit of helping at
+ * a point when contention with others is extremely unlikely
+ * (since if other threads that could release it are operating,
+ * then the current thread wouldn't be blocking).
+ *
+ * We allow both the head and tail fields to be null before any
+ * nodes are enqueued; initializing upon first append. This
+ * simplifies some other logic, as well as providing more
+ * efficient explicit control paths instead of letting JVMs insert
+ * implicit NullPointerExceptions when they are null. While not
+ * currently fully implemented, we also leave open the possibility
+ * of re-nulling these fields when empty (which is complicated to
+ * arrange, for little benefit.)
+ *
+ * All enqueue/dequeue operations are handled by the single method
+ * "xfer" with parameters indicating whether to act as some form
+ * of offer, put, poll, take, or transfer (each possibly with
+ * timeout). The relative complexity of using one monolithic
+ * method outweighs the code bulk and maintenance problems of
+ * using separate methods for each case.
+ *
+ * Operation consists of up to three phases. The first is
+ * implemented within method xfer, the second in tryAppend, and
+ * the third in method awaitMatch.
+ *
+ * 1. Try to match an existing node
+ *
+ * Starting at head, skip already-matched nodes until finding
+ * an unmatched node of opposite mode, if one exists, in which
+ * case matching it and returning, also if necessary updating
+ * head to one past the matched node (or the node itself if the
+ * list has no other unmatched nodes). If the CAS misses, then
+ * a loop retries advancing head by two steps until either
+ * success or the slack is at most two. By requiring that each
+ * attempt advances head by two (if applicable), we ensure that
+ * the slack does not grow without bound. Traversals also check
+ * if the initial head is now off-list, in which case they
+ * start at the new head.
+ *
+ * If no candidates are found and the call was untimed
+ * poll/offer, (argument "how" is NOW) return.
+ *
+ * 2. Try to append a new node (method tryAppend)
+ *
+ * Starting at current tail pointer, find the actual last node
+ * and try to append a new node (or if head was null, establish
+ * the first node). Nodes can be appended only if their
+ * predecessors are either already matched or are of the same
+ * mode. If we detect otherwise, then a new node with opposite
+ * mode must have been appended during traversal, so we must
+ * restart at phase 1. The traversal and update steps are
+ * otherwise similar to phase 1: Retrying upon CAS misses and
+ * checking for staleness. In particular, if a self-link is
+ * encountered, then we can safely jump to a node on the list
+ * by continuing the traversal at current head.
+ *
+ * On successful append, if the call was ASYNC, return.
+ *
+ * 3. Await match or cancellation (method awaitMatch)
+ *
+ * Wait for another thread to match node; instead cancelling if
+ * the current thread was interrupted or the wait timed out. On
+ * multiprocessors, we use front-of-queue spinning: If a node
+ * appears to be the first unmatched node in the queue, it
+ * spins a bit before blocking. In either case, before blocking
+ * it tries to unsplice any nodes between the current "head"
+ * and the first unmatched node.
+ *
+ * Front-of-queue spinning vastly improves performance of
+ * heavily contended queues. And so long as it is relatively
+ * brief and "quiet", spinning does not much impact performance
+ * of less-contended queues. During spins threads check their
+ * interrupt status and generate a thread-local random number
+ * to decide to occasionally perform a Thread.yield. While
+ * yield has underdefined specs, we assume that might it help,
+ * and will not hurt in limiting impact of spinning on busy
+ * systems. We also use smaller (1/2) spins for nodes that are
+ * not known to be front but whose predecessors have not
+ * blocked -- these "chained" spins avoid artifacts of
+ * front-of-queue rules which otherwise lead to alternating
+ * nodes spinning vs blocking. Further, front threads that
+ * represent phase changes (from data to request node or vice
+ * versa) compared to their predecessors receive additional
+ * chained spins, reflecting longer paths typically required to
+ * unblock threads during phase changes.
+ */
+
+ /** True if on multiprocessor */
+ private static final boolean MP =
+ Runtime.getRuntime().availableProcessors() > 1;
+
+ /**
+ * The number of times to spin (with randomly interspersed calls
+ * to Thread.yield) on multiprocessor before blocking when a node
+ * is apparently the first waiter in the queue. See above for
+ * explanation. Must be a power of two. The value is empirically
+ * derived -- it works pretty well across a variety of processors,
+ * numbers of CPUs, and OSes.
+ */
+ private static final int FRONT_SPINS = 1 << 7;
+
+ /**
+ * The number of times to spin before blocking when a node is
+ * preceded by another node that is apparently spinning. Also
+ * serves as an increment to FRONT_SPINS on phase changes, and as
+ * base average frequency for yielding during spins. Must be a
+ * power of two.
+ */
+ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
+
+ /**
+ * Queue nodes. Uses Object, not E, for items to allow forgetting
+ * them after use. Relies heavily on Unsafe mechanics to minimize
+ * unnecessary ordering constraints: Writes that intrinsically
+ * precede or follow CASes use simple relaxed forms. Other
+ * cleanups use releasing/lazy writes.
+ */
+ static final class Node {
+ final boolean isData; // false if this is a request node
+ volatile Object item; // initially non-null if isData; CASed to match
+ volatile Node next;
+ volatile Thread waiter; // null until waiting
+
+ // CAS methods for fields
+ final boolean casNext(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
+ }
+
+ final boolean casItem(Object cmp, Object val) {
+ // assert cmp == null || cmp.getClass() != Node.class;
+ return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
+ }
+
+ /**
+ * Creates a new node. Uses relaxed write because item can only
+ * be seen if followed by CAS.
+ */
+ Node(Object item, boolean isData) {
+ UNSAFE.putObject(this, itemOffset, item); // relaxed write
+ this.isData = isData;
+ }
+
+ /**
+ * Links node to itself to avoid garbage retention. Called
+ * only after CASing head field, so uses relaxed write.
+ */
+ final void forgetNext() {
+ UNSAFE.putObject(this, nextOffset, this);
+ }
+
+ /**
+ * Sets item to self (using a releasing/lazy write) and waiter
+ * to null, to avoid garbage retention after extracting or
+ * cancelling.
+ */
+ final void forgetContents() {
+ UNSAFE.putOrderedObject(this, itemOffset, this);
+ UNSAFE.putOrderedObject(this, waiterOffset, null);
+ }
+
+ /**
+ * Returns true if this node has been matched, including the
+ * case of artificial matches due to cancellation.
+ */
+ final boolean isMatched() {
+ Object x = item;
+ return (x == this) || ((x == null) == isData);
+ }
+
+ /**
+ * Returns true if this is an unmatched request node.
+ */
+ final boolean isUnmatchedRequest() {
+ return !isData && item == null;
+ }
+
+ /**
+ * Returns true if a node with the given mode cannot be
+ * appended to this node because this node is unmatched and
+ * has opposite data mode.
+ */
+ final boolean cannotPrecede(boolean haveData) {
+ boolean d = isData;
+ Object x;
+ return d != haveData && (x = item) != this && (x != null) == d;
+ }
+
+ /**
+ * Tries to artificially match a data node -- used by remove.
+ */
+ final boolean tryMatchData() {
+ // assert isData;
+ Object x = item;
+ if (x != null && x != this && casItem(x, null)) {
+ LockSupport.unpark(waiter);
+ return true;
+ }
+ return false;
+ }
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", Node.class);
+ private static final long itemOffset =
+ objectFieldOffset(UNSAFE, "item", Node.class);
+ private static final long waiterOffset =
+ objectFieldOffset(UNSAFE, "waiter", Node.class);
+
+ private static final long serialVersionUID = -3375979862319811754L;
+ }
+
+ /** head of the queue; null until first enqueue */
+ transient volatile Node head;
+
+ /** predecessor of dangling unspliceable node */
+ private transient volatile Node cleanMe; // decl here reduces contention
+
+ /** tail of the queue; null until first append */
+ private transient volatile Node tail;
+
+ // CAS methods for fields
+ private boolean casTail(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
+ }
+
+ private boolean casHead(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
+ }
+
+ private boolean casCleanMe(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
+ }
+
+ /*
+ * Possible values for "how" argument in xfer method.
+ */
+ private static final int NOW = 0; // for untimed poll, tryTransfer
+ private static final int ASYNC = 1; // for offer, put, add
+ private static final int SYNC = 2; // for transfer, take
+ private static final int TIMED = 3; // for timed poll, tryTransfer
+
+ @SuppressWarnings("unchecked")
+ static <E> E cast(Object item) {
+ // assert item == null || item.getClass() != Node.class;
+ return (E) item;
+ }
+
+ /**
+ * Implements all queuing methods. See above for explanation.
+ *
+ * @param e the item or null for take
+ * @param haveData true if this is a put, else a take
+ * @param how NOW, ASYNC, SYNC, or TIMED
+ * @param nanos timeout in nanosecs, used only if mode is TIMED
+ * @return an item if matched, else e
+ * @throws NullPointerException if haveData mode but e is null
+ */
+ private E xfer(E e, boolean haveData, int how, long nanos) {
+ if (haveData && (e == null))
+ throw new NullPointerException();
+ Node s = null; // the node to append, if needed
+
+ retry: for (;;) { // restart on append race
+
+ for (Node h = head, p = h; p != null;) { // find & match first node
+ boolean isData = p.isData;
+ Object item = p.item;
+ if (item != p && (item != null) == isData) { // unmatched
+ if (isData == haveData) // can't match
+ break;
+ if (p.casItem(item, e)) { // match
+ for (Node q = p; q != h;) {
+ Node n = q.next; // update head by 2
+ if (n != null) // unless singleton
+ q = n;
+ if (head == h && casHead(h, q)) {
+ h.forgetNext();
+ break;
+ } // advance and retry
+ if ((h = head) == null ||
+ (q = h.next) == null || !q.isMatched())
+ break; // unless slack < 2
+ }
+ LockSupport.unpark(p.waiter);
+ return this.<E>cast(item);
+ }
+ }
+ Node n = p.next;
+ p = (p != n) ? n : (h = head); // Use head if p offlist
+ }
+
+ if (how != NOW) { // No matches available
+ if (s == null)
+ s = new Node(e, haveData);
+ Node pred = tryAppend(s, haveData);
+ if (pred == null)
+ continue retry; // lost race vs opposite mode
+ if (how != ASYNC)
+ return awaitMatch(s, pred, e, (how == TIMED), nanos);
+ }
+ return e; // not waiting
+ }
+ }
+
+ /**
+ * Tries to append node s as tail.
+ *
+ * @param s the node to append
+ * @param haveData true if appending in data mode
+ * @return null on failure due to losing race with append in
+ * different mode, else s's predecessor, or s itself if no
+ * predecessor
+ */
+ private Node tryAppend(Node s, boolean haveData) {
+ for (Node t = tail, p = t;;) { // move p to last node and append
+ Node n, u; // temps for reads of next & tail
+ if (p == null && (p = head) == null) {
+ if (casHead(null, s))
+ return s; // initialize
+ }
+ else if (p.cannotPrecede(haveData))
+ return null; // lost race vs opposite mode
+ else if ((n = p.next) != null) // not last; keep traversing
+ p = p != t && t != (u = tail) ? (t = u) : // stale tail
+ (p != n) ? n : null; // restart if off list
+ else if (!p.casNext(null, s))
+ p = p.next; // re-read on CAS failure
+ else {
+ if (p != t) { // update if slack now >= 2
+ while ((tail != t || !casTail(t, s)) &&
+ (t = tail) != null &&
+ (s = t.next) != null && // advance and retry
+ (s = s.next) != null && s != t);
+ }
+ return p;
+ }
+ }
+ }
+
+ /**
+ * Spins/yields/blocks until node s is matched or caller gives up.
+ *
+ * @param s the waiting node
+ * @param pred the predecessor of s, or s itself if it has no
+ * predecessor, or null if unknown (the null case does not occur
+ * in any current calls but may in possible future extensions)
+ * @param e the comparison value for checking match
+ * @param timed if true, wait only until timeout elapses
+ * @param nanos timeout in nanosecs, used only if timed is true
+ * @return matched item, or e if unmatched on interrupt or timeout
+ */
+ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
+ long lastTime = timed ? System.nanoTime() : 0L;
+ Thread w = Thread.currentThread();
+ int spins = -1; // initialized after first item and cancel checks
+ ThreadLocalRandom randomYields = null; // bound if needed
+
+ for (;;) {
+ Object item = s.item;
+ if (item != e) { // matched
+ // assert item != s;
+ s.forgetContents(); // avoid garbage
+ return this.<E>cast(item);
+ }
+ if ((w.isInterrupted() || (timed && nanos <= 0)) &&
+ s.casItem(e, s)) { // cancel
+ unsplice(pred, s);
+ return e;
+ }
+
+ if (spins < 0) { // establish spins at/near front
+ if ((spins = spinsFor(pred, s.isData)) > 0)
+ randomYields = ThreadLocalRandom.current();
+ }
+ else if (spins > 0) { // spin
+ if (--spins == 0)
+ shortenHeadPath(); // reduce slack before blocking
+ else if (randomYields.nextInt(CHAINED_SPINS) == 0)
+ Thread.yield(); // occasionally yield
+ }
+ else if (s.waiter == null) {
+ s.waiter = w; // request unpark then recheck
+ }
+ else if (timed) {
+ long now = System.nanoTime();
+ if ((nanos -= now - lastTime) > 0)
+ LockSupport.parkNanos(this, nanos);
+ lastTime = now;
+ }
+ else {
+ LockSupport.park(this);
+ s.waiter = null;
+ spins = -1; // spin if front upon wakeup
+ }
+ }
+ }
+
+ /**
+ * Returns spin/yield value for a node with given predecessor and
+ * data mode. See above for explanation.
+ */
+ private static int spinsFor(Node pred, boolean haveData) {
+ if (MP && pred != null) {
+ if (pred.isData != haveData) // phase change
+ return FRONT_SPINS + CHAINED_SPINS;
+ if (pred.isMatched()) // probably at front
+ return FRONT_SPINS;
+ if (pred.waiter == null) // pred apparently spinning
+ return CHAINED_SPINS;
+ }
+ return 0;
+ }
+
+ /**
+ * Tries (once) to unsplice nodes between head and first unmatched
+ * or trailing node; failing on contention.
+ */
+ private void shortenHeadPath() {
+ Node h, hn, p, q;
+ if ((p = h = head) != null && h.isMatched() &&
+ (q = hn = h.next) != null) {
+ Node n;
+ while ((n = q.next) != q) {
+ if (n == null || !q.isMatched()) {
+ if (hn != q && h.next == hn)
+ h.casNext(hn, q);
+ break;
+ }
+ p = q;
+ q = n;
+ }
+ }
+ }
+
+ /* -------------- Traversal methods -------------- */
+
+ /**
+ * Returns the successor of p, or the head node if p.next has been
+ * linked to self, which will only be true if traversing with a
+ * stale pointer that is now off the list.
+ */
+ final Node succ(Node p) {
+ Node next = p.next;
+ return (p == next) ? head : next;
+ }
+
+ /**
+ * Returns the first unmatched node of the given mode, or null if
+ * none. Used by methods isEmpty, hasWaitingConsumer.
+ */
+ private Node firstOfMode(boolean isData) {
+ for (Node p = head; p != null; p = succ(p)) {
+ if (!p.isMatched())
+ return (p.isData == isData) ? p : null;
+ }
+ return null;
+ }
+
+ /**
+ * Returns the item in the first unmatched node with isData; or
+ * null if none. Used by peek.
+ */
+ private E firstDataItem() {
+ for (Node p = head; p != null; p = succ(p)) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p)
+ return this.<E>cast(item);
+ }
+ else if (item == null)
+ return null;
+ }
+ return null;
+ }
+
+ /**
+ * Traverses and counts unmatched nodes of the given mode.
+ * Used by methods size and getWaitingConsumerCount.
+ */
+ private int countOfMode(boolean data) {
+ int count = 0;
+ for (Node p = head; p != null; ) {
+ if (!p.isMatched()) {
+ if (p.isData != data)
+ return 0;
+ if (++count == Integer.MAX_VALUE) // saturated
+ break;
+ }
+ Node n = p.next;
+ if (n != p)
+ p = n;
+ else {
+ count = 0;
+ p = head;
+ }
+ }
+ return count;
+ }
+
+ final class Itr implements Iterator<E> {
+ private Node nextNode; // next node to return item for
+ private E nextItem; // the corresponding item
+ private Node lastRet; // last returned node, to support remove
+ private Node lastPred; // predecessor to unlink lastRet
+
+ /**
+ * Moves to next node after prev, or first node if prev null.
+ */
+ private void advance(Node prev) {
+ lastPred = lastRet;
+ lastRet = prev;
+ for (Node p = (prev == null) ? head : succ(prev);
+ p != null; p = succ(p)) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p) {
+ nextItem = LinkedTransferQueue.this.<E>cast(item);
+ nextNode = p;
+ return;
+ }
+ }
+ else if (item == null)
+ break;
+ }
+ nextNode = null;
+ }
+
+ Itr() {
+ advance(null);
+ }
+
+ public final boolean hasNext() {
+ return nextNode != null;
+ }
+
+ public final E next() {
+ Node p = nextNode;
+ if (p == null) throw new NoSuchElementException();
+ E e = nextItem;
+ advance(p);
+ return e;
+ }
+
+ public final void remove() {
+ Node p = lastRet;
+ if (p == null) throw new IllegalStateException();
+ findAndRemoveDataNode(lastPred, p);
+ }
+ }
+
+ /* -------------- Removal methods -------------- */
+
+ /**
+ * Unsplices (now or later) the given deleted/cancelled node with
+ * the given predecessor.
+ *
+ * @param pred predecessor of node to be unspliced
+ * @param s the node to be unspliced
+ */
+ private void unsplice(Node pred, Node s) {
+ s.forgetContents(); // clear unneeded fields
+ /*
+ * At any given time, exactly one node on list cannot be
+ * unlinked -- the last inserted node. To accommodate this, if
+ * we cannot unlink s, we save its predecessor as "cleanMe",
+ * processing the previously saved version first. Because only
+ * one node in the list can have a null next, at least one of
+ * node s or the node previously saved can always be
+ * processed, so this always terminates.
+ */
+ if (pred != null && pred != s) {
+ while (pred.next == s) {
+ Node oldpred = (cleanMe == null) ? null : reclean();
+ Node n = s.next;
+ if (n != null) {
+ if (n != s)
+ pred.casNext(s, n);
+ break;
+ }
+ if (oldpred == pred || // Already saved
+ ((oldpred == null || oldpred.next == s) &&
+ casCleanMe(oldpred, pred))) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Tries to unsplice the deleted/cancelled node held in cleanMe
+ * that was previously uncleanable because it was at tail.
+ *
+ * @return current cleanMe node (or null)
+ */
+ private Node reclean() {
+ /*
+ * cleanMe is, or at one time was, predecessor of a cancelled
+ * node s that was the tail so could not be unspliced. If it
+ * is no longer the tail, try to unsplice if necessary and
+ * make cleanMe slot available. This differs from similar
+ * code in unsplice() because we must check that pred still
+ * points to a matched node that can be unspliced -- if not,
+ * we can (must) clear cleanMe without unsplicing. This can
+ * loop only due to contention.
+ */
+ Node pred;
+ while ((pred = cleanMe) != null) {
+ Node s = pred.next;
+ Node n;
+ if (s == null || s == pred || !s.isMatched())
+ casCleanMe(pred, null); // already gone
+ else if ((n = s.next) != null) {
+ if (n != s)
+ pred.casNext(s, n);
+ casCleanMe(pred, null);
+ }
+ else
+ break;
+ }
+ return pred;
+ }
+
+ /**
+ * Main implementation of Iterator.remove(). Finds
+ * and unsplices the given data node.
+ *
+ * @param possiblePred possible predecessor of s
+ * @param s the node to remove
+ */
+ final void findAndRemoveDataNode(Node possiblePred, Node s) {
+ // assert s.isData;
+ if (s.tryMatchData()) {
+ if (possiblePred != null && possiblePred.next == s)
+ unsplice(possiblePred, s); // was actual predecessor
+ else {
+ for (Node pred = null, p = head; p != null; ) {
+ if (p == s) {
+ unsplice(pred, p);
+ break;
+ }
+ if (p.isUnmatchedRequest())
+ break;
+ pred = p;
+ if ((p = p.next) == pred) { // stale
+ pred = null;
+ p = head;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Main implementation of remove(Object)
+ */
+ private boolean findAndRemove(Object e) {
+ if (e != null) {
+ for (Node pred = null, p = head; p != null; ) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p && e.equals(item) &&
+ p.tryMatchData()) {
+ unsplice(pred, p);
+ return true;
+ }
+ }
+ else if (item == null)
+ break;
+ pred = p;
+ if ((p = p.next) == pred) { // stale
+ pred = null;
+ p = head;
+ }
+ }
+ }
+ return false;
+ }
+
+
+ /**
+ * Creates an initially empty {@code LinkedTransferQueue}.
+ */
+ public LinkedTransferQueue() {
+ }
+
+ /**
+ * Creates a {@code LinkedTransferQueue}
+ * initially containing the elements of the given collection,
+ * added in traversal order of the collection's iterator.
+ *
+ * @param c the collection of elements to initially contain
+ * @throws NullPointerException if the specified collection or any
+ * of its elements are null
+ */
+ public LinkedTransferQueue(Collection<? extends E> c) {
+ this();
+ addAll(c);
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public void put(E e) {
+ xfer(e, true, ASYNC, 0);
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block or
+ * return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit) {
+ xfer(e, true, ASYNC, 0);
+ return true;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e) {
+ xfer(e, true, ASYNC, 0);
+ return true;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
+ *
+ * @return {@code true} (as specified by {@link Collection#add})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean add(E e) {
+ xfer(e, true, ASYNC, 0);
+ return true;
+ }
+
+ /**
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e) {
+ return xfer(e, true, NOW, 0) == null;
+ }
+
+ /**
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public void transfer(E e) throws InterruptedException {
+ if (xfer(e, true, SYNC, 0) != null) {
+ Thread.interrupted(); // failure possible only due to interrupt
+ throw new InterruptedException();
+ }
+ }
+
+ /**
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
+ return true;
+ if (!Thread.interrupted())
+ return false;
+ throw new InterruptedException();
+ }
+
+ public E take() throws InterruptedException {
+ E e = xfer(null, false, SYNC, 0);
+ if (e != null)
+ return e;
+ Thread.interrupted();
+ throw new InterruptedException();
+ }
+
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ E e = xfer(null, false, TIMED, unit.toNanos(timeout));
+ if (e != null || !Thread.interrupted())
+ return e;
+ throw new InterruptedException();
+ }
+
+ public E poll() {
+ return xfer(null, false, NOW, 0);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ int n = 0;
+ E e;
+ while ( (e = poll()) != null) {
+ c.add(e);
+ ++n;
+ }
+ return n;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ int n = 0;
+ E e;
+ while (n < maxElements && (e = poll()) != null) {
+ c.add(e);
+ ++n;
+ }
+ return n;
+ }
+
+ /**
+ * Returns an iterator over the elements in this queue in proper
+ * sequence, from head to tail.
+ *
+ * <p>The returned iterator is a "weakly consistent" iterator that
+ * will never throw
+ * {@link ConcurrentModificationException ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed
+ * to) reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
+ */
+ public Iterator<E> iterator() {
+ return new Itr();
+ }
+
+ public E peek() {
+ return firstDataItem();
+ }
+
+ /**
+ * Returns {@code true} if this queue contains no elements.
+ *
+ * @return {@code true} if this queue contains no elements
+ */
+ public boolean isEmpty() {
+ return firstOfMode(true) == null;
+ }
+
+ public boolean hasWaitingConsumer() {
+ return firstOfMode(false) != null;
+ }
+
+ /**
+ * Returns the number of elements in this queue. If this queue
+ * contains more than {@code Integer.MAX_VALUE} elements, returns
+ * {@code Integer.MAX_VALUE}.
+ *
+ * <p>Beware that, unlike in most collections, this method is
+ * <em>NOT</em> a constant-time operation. Because of the
+ * asynchronous nature of these queues, determining the current
+ * number of elements requires an O(n) traversal.
+ *
+ * @return the number of elements in this queue
+ */
+ public int size() {
+ return countOfMode(true);
+ }
+
+ public int getWaitingConsumerCount() {
+ return countOfMode(false);
+ }
+
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ return findAndRemove(o);
+ }
+
+ /**
+ * Always returns {@code Integer.MAX_VALUE} because a
+ * {@code LinkedTransferQueue} is not capacity constrained.
+ *
+ * @return {@code Integer.MAX_VALUE} (as specified by
+ * {@link BlockingQueue#remainingCapacity()})
+ */
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ /**
+ * Saves the state to a stream (that is, serializes it).
+ *
+ * @serialData All of the elements (each an {@code E}) in
+ * the proper order, followed by a null
+ * @param s the stream
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ s.defaultWriteObject();
+ for (E e : this)
+ s.writeObject(e);
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ }
+
+ /**
+ * Reconstitutes the Queue instance from a stream (that is,
+ * deserializes it).
+ *
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ for (;;) {
+ @SuppressWarnings("unchecked") E item = (E) s.readObject();
+ if (item == null)
+ break;
+ else
+ offer(item);
+ }
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
+ private static final long cleanMeOffset =
+ objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/Phaser.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,1042 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A reusable synchronization barrier, similar in functionality to
+ * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
+ * {@link java.util.concurrent.CountDownLatch CountDownLatch}
+ * but supporting more flexible usage.
+ *
+ * <p> <b>Registration.</b> Unlike the case for other barriers, the
+ * number of parties <em>registered</em> to synchronize on a phaser
+ * may vary over time. Tasks may be registered at any time (using
+ * methods {@link #register}, {@link #bulkRegister}, or forms of
+ * constructors establishing initial numbers of parties), and
+ * optionally deregistered upon any arrival (using {@link
+ * #arriveAndDeregister}). As is the case with most basic
+ * synchronization constructs, registration and deregistration affect
+ * only internal counts; they do not establish any further internal
+ * bookkeeping, so tasks cannot query whether they are registered.
+ * (However, you can introduce such bookkeeping by subclassing this
+ * class.)
+ *
+ * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
+ * Phaser} may be repeatedly awaited. Method {@link
+ * #arriveAndAwaitAdvance} has effect analogous to {@link
+ * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
+ * generation of a {@code Phaser} has an associated phase number. The
+ * phase number starts at zero, and advances when all parties arrive
+ * at the barrier, wrapping around to zero after reaching {@code
+ * Integer.MAX_VALUE}. The use of phase numbers enables independent
+ * control of actions upon arrival at a barrier and upon awaiting
+ * others, via two kinds of methods that may be invoked by any
+ * registered party:
+ *
+ * <ul>
+ *
+ * <li> <b>Arrival.</b> Methods {@link #arrive} and
+ * {@link #arriveAndDeregister} record arrival at a
+ * barrier. These methods do not block, but return an associated
+ * <em>arrival phase number</em>; that is, the phase number of
+ * the barrier to which the arrival applied. When the final
+ * party for a given phase arrives, an optional barrier action
+ * is performed and the phase advances. Barrier actions,
+ * performed by the party triggering a phase advance, are
+ * arranged by overriding method {@link #onAdvance(int, int)},
+ * which also controls termination. Overriding this method is
+ * similar to, but more flexible than, providing a barrier
+ * action to a {@code CyclicBarrier}.
+ *
+ * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
+ * argument indicating an arrival phase number, and returns when
+ * the barrier advances to (or is already at) a different phase.
+ * Unlike similar constructions using {@code CyclicBarrier},
+ * method {@code awaitAdvance} continues to wait even if the
+ * waiting thread is interrupted. Interruptible and timeout
+ * versions are also available, but exceptions encountered while
+ * tasks wait interruptibly or with timeout do not change the
+ * state of the barrier. If necessary, you can perform any
+ * associated recovery within handlers of those exceptions,
+ * often after invoking {@code forceTermination}. Phasers may
+ * also be used by tasks executing in a {@link ForkJoinPool},
+ * which will ensure sufficient parallelism to execute tasks
+ * when others are blocked waiting for a phase to advance.
+ *
+ * </ul>
+ *
+ * <p> <b>Termination.</b> A {@code Phaser} may enter a
+ * <em>termination</em> state in which all synchronization methods
+ * immediately return without updating phaser state or waiting for
+ * advance, and indicating (via a negative phase value) that execution
+ * is complete. Termination is triggered when an invocation of {@code
+ * onAdvance} returns {@code true}. As illustrated below, when
+ * phasers control actions with a fixed number of iterations, it is
+ * often convenient to override this method to cause termination when
+ * the current phase number reaches a threshold. Method {@link
+ * #forceTermination} is also available to abruptly release waiting
+ * threads and allow them to terminate.
+ *
+ * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
+ * in tree structures) to reduce contention. Phasers with large
+ * numbers of parties that would otherwise experience heavy
+ * synchronization contention costs may instead be set up so that
+ * groups of sub-phasers share a common parent. This may greatly
+ * increase throughput even though it incurs greater per-operation
+ * overhead.
+ *
+ * <p><b>Monitoring.</b> While synchronization methods may be invoked
+ * only by registered parties, the current state of a phaser may be
+ * monitored by any caller. At any given moment there are {@link
+ * #getRegisteredParties} parties in total, of which {@link
+ * #getArrivedParties} have arrived at the current phase ({@link
+ * #getPhase}). When the remaining ({@link #getUnarrivedParties})
+ * parties arrive, the phase advances. The values returned by these
+ * methods may reflect transient states and so are not in general
+ * useful for synchronization control. Method {@link #toString}
+ * returns snapshots of these state queries in a form convenient for
+ * informal monitoring.
+ *
+ * <p><b>Sample usages:</b>
+ *
+ * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
+ * to control a one-shot action serving a variable number of
+ * parties. The typical idiom is for the method setting this up to
+ * first register, then start the actions, then deregister, as in:
+ *
+ * <pre> {@code
+ * void runTasks(List<Runnable> tasks) {
+ * final Phaser phaser = new Phaser(1); // "1" to register self
+ * // create and start threads
+ * for (Runnable task : tasks) {
+ * phaser.register();
+ * new Thread() {
+ * public void run() {
+ * phaser.arriveAndAwaitAdvance(); // await all creation
+ * task.run();
+ * }
+ * }.start();
+ * }
+ *
+ * // allow threads to start and deregister self
+ * phaser.arriveAndDeregister();
+ * }}</pre>
+ *
+ * <p>One way to cause a set of threads to repeatedly perform actions
+ * for a given number of iterations is to override {@code onAdvance}:
+ *
+ * <pre> {@code
+ * void startTasks(List<Runnable> tasks, final int iterations) {
+ * final Phaser phaser = new Phaser() {
+ * protected boolean onAdvance(int phase, int registeredParties) {
+ * return phase >= iterations || registeredParties == 0;
+ * }
+ * };
+ * phaser.register();
+ * for (final Runnable task : tasks) {
+ * phaser.register();
+ * new Thread() {
+ * public void run() {
+ * do {
+ * task.run();
+ * phaser.arriveAndAwaitAdvance();
+ * } while (!phaser.isTerminated());
+ * }
+ * }.start();
+ * }
+ * phaser.arriveAndDeregister(); // deregister self, don't wait
+ * }}</pre>
+ *
+ * If the main task must later await termination, it
+ * may re-register and then execute a similar loop:
+ * <pre> {@code
+ * // ...
+ * phaser.register();
+ * while (!phaser.isTerminated())
+ * phaser.arriveAndAwaitAdvance();}</pre>
+ *
+ * <p>Related constructions may be used to await particular phase numbers
+ * in contexts where you are sure that the phase will never wrap around
+ * {@code Integer.MAX_VALUE}. For example:
+ *
+ * <pre> {@code
+ * void awaitPhase(Phaser phaser, int phase) {
+ * int p = phaser.register(); // assumes caller not already registered
+ * while (p < phase) {
+ * if (phaser.isTerminated())
+ * // ... deal with unexpected termination
+ * else
+ * p = phaser.arriveAndAwaitAdvance();
+ * }
+ * phaser.arriveAndDeregister();
+ * }}</pre>
+ *
+ *
+ * <p>To create a set of tasks using a tree of phasers,
+ * you could use code of the following form, assuming a
+ * Task class with a constructor accepting a phaser that
+ * it registers for upon construction:
+ *
+ * <pre> {@code
+ * void build(Task[] actions, int lo, int hi, Phaser ph) {
+ * if (hi - lo > TASKS_PER_PHASER) {
+ * for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
+ * int j = Math.min(i + TASKS_PER_PHASER, hi);
+ * build(actions, i, j, new Phaser(ph));
+ * }
+ * } else {
+ * for (int i = lo; i < hi; ++i)
+ * actions[i] = new Task(ph);
+ * // assumes new Task(ph) performs ph.register()
+ * }
+ * }
+ * // .. initially called, for n tasks via
+ * build(new Task[n], 0, n, new Phaser());}</pre>
+ *
+ * The best value of {@code TASKS_PER_PHASER} depends mainly on
+ * expected barrier synchronization rates. A value as low as four may
+ * be appropriate for extremely small per-barrier task bodies (thus
+ * high rates), or up to hundreds for extremely large ones.
+ *
+ * </pre>
+ *
+ * <p><b>Implementation notes</b>: This implementation restricts the
+ * maximum number of parties to 65535. Attempts to register additional
+ * parties result in {@code IllegalStateException}. However, you can and
+ * should create tiered phasers to accommodate arbitrarily large sets
+ * of participants.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public class Phaser {
+ /*
+ * This class implements an extension of X10 "clocks". Thanks to
+ * Vijay Saraswat for the idea, and to Vivek Sarkar for
+ * enhancements to extend functionality.
+ */
+
+ /**
+ * Barrier state representation. Conceptually, a barrier contains
+ * four values:
+ *
+ * * parties -- the number of parties to wait (16 bits)
+ * * unarrived -- the number of parties yet to hit barrier (16 bits)
+ * * phase -- the generation of the barrier (31 bits)
+ * * terminated -- set if barrier is terminated (1 bit)
+ *
+ * However, to efficiently maintain atomicity, these values are
+ * packed into a single (atomic) long. Termination uses the sign
+ * bit of 32 bit representation of phase, so phase is set to -1 on
+ * termination. Good performance relies on keeping state decoding
+ * and encoding simple, and keeping race windows short.
+ *
+ * Note: there are some cheats in arrive() that rely on unarrived
+ * count being lowest 16 bits.
+ */
+ private volatile long state;
+
+ private static final int ushortMask = 0xffff;
+ private static final int phaseMask = 0x7fffffff;
+
+ private static int unarrivedOf(long s) {
+ return (int) (s & ushortMask);
+ }
+
+ private static int partiesOf(long s) {
+ return ((int) s) >>> 16;
+ }
+
+ private static int phaseOf(long s) {
+ return (int) (s >>> 32);
+ }
+
+ private static int arrivedOf(long s) {
+ return partiesOf(s) - unarrivedOf(s);
+ }
+
+ private static long stateFor(int phase, int parties, int unarrived) {
+ return ((((long) phase) << 32) | (((long) parties) << 16) |
+ (long) unarrived);
+ }
+
+ private static long trippedStateFor(int phase, int parties) {
+ long lp = (long) parties;
+ return (((long) phase) << 32) | (lp << 16) | lp;
+ }
+
+ /**
+ * Returns message string for bad bounds exceptions.
+ */
+ private static String badBounds(int parties, int unarrived) {
+ return ("Attempt to set " + unarrived +
+ " unarrived of " + parties + " parties");
+ }
+
+ /**
+ * The parent of this phaser, or null if none
+ */
+ private final Phaser parent;
+
+ /**
+ * The root of phaser tree. Equals this if not in a tree. Used to
+ * support faster state push-down.
+ */
+ private final Phaser root;
+
+ // Wait queues
+
+ /**
+ * Heads of Treiber stacks for waiting threads. To eliminate
+ * contention while releasing some threads while adding others, we
+ * use two of them, alternating across even and odd phases.
+ */
+ private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
+ private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
+
+ private AtomicReference<QNode> queueFor(int phase) {
+ return ((phase & 1) == 0) ? evenQ : oddQ;
+ }
+
+ /**
+ * Returns current state, first resolving lagged propagation from
+ * root if necessary.
+ */
+ private long getReconciledState() {
+ return (parent == null) ? state : reconcileState();
+ }
+
+ /**
+ * Recursively resolves state.
+ */
+ private long reconcileState() {
+ Phaser p = parent;
+ long s = state;
+ if (p != null) {
+ while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
+ long parentState = p.getReconciledState();
+ int parentPhase = phaseOf(parentState);
+ int phase = phaseOf(s = state);
+ if (phase != parentPhase) {
+ long next = trippedStateFor(parentPhase, partiesOf(s));
+ if (casState(s, next)) {
+ releaseWaiters(phase);
+ s = next;
+ }
+ }
+ }
+ }
+ return s;
+ }
+
+ /**
+ * Creates a new phaser without any initially registered parties,
+ * initial phase number 0, and no parent. Any thread using this
+ * phaser will need to first register for it.
+ */
+ public Phaser() {
+ this(null);
+ }
+
+ /**
+ * Creates a new phaser with the given numbers of registered
+ * unarrived parties, initial phase number 0, and no parent.
+ *
+ * @param parties the number of parties required to trip barrier
+ * @throws IllegalArgumentException if parties less than zero
+ * or greater than the maximum number of parties supported
+ */
+ public Phaser(int parties) {
+ this(null, parties);
+ }
+
+ /**
+ * Creates a new phaser with the given parent, without any
+ * initially registered parties. If parent is non-null this phaser
+ * is registered with the parent and its initial phase number is
+ * the same as that of parent phaser.
+ *
+ * @param parent the parent phaser
+ */
+ public Phaser(Phaser parent) {
+ int phase = 0;
+ this.parent = parent;
+ if (parent != null) {
+ this.root = parent.root;
+ phase = parent.register();
+ }
+ else
+ this.root = this;
+ this.state = trippedStateFor(phase, 0);
+ }
+
+ /**
+ * Creates a new phaser with the given parent and numbers of
+ * registered unarrived parties. If parent is non-null, this phaser
+ * is registered with the parent and its initial phase number is
+ * the same as that of parent phaser.
+ *
+ * @param parent the parent phaser
+ * @param parties the number of parties required to trip barrier
+ * @throws IllegalArgumentException if parties less than zero
+ * or greater than the maximum number of parties supported
+ */
+ public Phaser(Phaser parent, int parties) {
+ if (parties < 0 || parties > ushortMask)
+ throw new IllegalArgumentException("Illegal number of parties");
+ int phase = 0;
+ this.parent = parent;
+ if (parent != null) {
+ this.root = parent.root;
+ phase = parent.register();
+ }
+ else
+ this.root = this;
+ this.state = trippedStateFor(phase, parties);
+ }
+
+ /**
+ * Adds a new unarrived party to this phaser.
+ *
+ * @return the arrival phase number to which this registration applied
+ * @throws IllegalStateException if attempting to register more
+ * than the maximum supported number of parties
+ */
+ public int register() {
+ return doRegister(1);
+ }
+
+ /**
+ * Adds the given number of new unarrived parties to this phaser.
+ *
+ * @param parties the number of parties required to trip barrier
+ * @return the arrival phase number to which this registration applied
+ * @throws IllegalStateException if attempting to register more
+ * than the maximum supported number of parties
+ */
+ public int bulkRegister(int parties) {
+ if (parties < 0)
+ throw new IllegalArgumentException();
+ if (parties == 0)
+ return getPhase();
+ return doRegister(parties);
+ }
+
+ /**
+ * Shared code for register, bulkRegister
+ */
+ private int doRegister(int registrations) {
+ int phase;
+ for (;;) {
+ long s = getReconciledState();
+ phase = phaseOf(s);
+ int unarrived = unarrivedOf(s) + registrations;
+ int parties = partiesOf(s) + registrations;
+ if (phase < 0)
+ break;
+ if (parties > ushortMask || unarrived > ushortMask)
+ throw new IllegalStateException(badBounds(parties, unarrived));
+ if (phase == phaseOf(root.state) &&
+ casState(s, stateFor(phase, parties, unarrived)))
+ break;
+ }
+ return phase;
+ }
+
+ /**
+ * Arrives at the barrier, but does not wait for others. (You can
+ * in turn wait for others via {@link #awaitAdvance}). It is an
+ * unenforced usage error for an unregistered party to invoke this
+ * method.
+ *
+ * @return the arrival phase number, or a negative value if terminated
+ * @throws IllegalStateException if not terminated and the number
+ * of unarrived parties would become negative
+ */
+ public int arrive() {
+ int phase;
+ for (;;) {
+ long s = state;
+ phase = phaseOf(s);
+ if (phase < 0)
+ break;
+ int parties = partiesOf(s);
+ int unarrived = unarrivedOf(s) - 1;
+ if (unarrived > 0) { // Not the last arrival
+ if (casState(s, s - 1)) // s-1 adds one arrival
+ break;
+ }
+ else if (unarrived == 0) { // the last arrival
+ Phaser par = parent;
+ if (par == null) { // directly trip
+ if (casState
+ (s,
+ trippedStateFor(onAdvance(phase, parties) ? -1 :
+ ((phase + 1) & phaseMask), parties))) {
+ releaseWaiters(phase);
+ break;
+ }
+ }
+ else { // cascade to parent
+ if (casState(s, s - 1)) { // zeroes unarrived
+ par.arrive();
+ reconcileState();
+ break;
+ }
+ }
+ }
+ else if (phase != phaseOf(root.state)) // or if unreconciled
+ reconcileState();
+ else
+ throw new IllegalStateException(badBounds(parties, unarrived));
+ }
+ return phase;
+ }
+
+ /**
+ * Arrives at the barrier and deregisters from it without waiting
+ * for others. Deregistration reduces the number of parties
+ * required to trip the barrier in future phases. If this phaser
+ * has a parent, and deregistration causes this phaser to have
+ * zero parties, this phaser also arrives at and is deregistered
+ * from its parent. It is an unenforced usage error for an
+ * unregistered party to invoke this method.
+ *
+ * @return the arrival phase number, or a negative value if terminated
+ * @throws IllegalStateException if not terminated and the number
+ * of registered or unarrived parties would become negative
+ */
+ public int arriveAndDeregister() {
+ // similar code to arrive, but too different to merge
+ Phaser par = parent;
+ int phase;
+ for (;;) {
+ long s = state;
+ phase = phaseOf(s);
+ if (phase < 0)
+ break;
+ int parties = partiesOf(s) - 1;
+ int unarrived = unarrivedOf(s) - 1;
+ if (parties >= 0) {
+ if (unarrived > 0 || (unarrived == 0 && par != null)) {
+ if (casState
+ (s,
+ stateFor(phase, parties, unarrived))) {
+ if (unarrived == 0) {
+ par.arriveAndDeregister();
+ reconcileState();
+ }
+ break;
+ }
+ continue;
+ }
+ if (unarrived == 0) {
+ if (casState
+ (s,
+ trippedStateFor(onAdvance(phase, parties) ? -1 :
+ ((phase + 1) & phaseMask), parties))) {
+ releaseWaiters(phase);
+ break;
+ }
+ continue;
+ }
+ if (par != null && phase != phaseOf(root.state)) {
+ reconcileState();
+ continue;
+ }
+ }
+ throw new IllegalStateException(badBounds(parties, unarrived));
+ }
+ return phase;
+ }
+
+ /**
+ * Arrives at the barrier and awaits others. Equivalent in effect
+ * to {@code awaitAdvance(arrive())}. If you need to await with
+ * interruption or timeout, you can arrange this with an analogous
+ * construction using one of the other forms of the awaitAdvance
+ * method. If instead you need to deregister upon arrival use
+ * {@code arriveAndDeregister}. It is an unenforced usage error
+ * for an unregistered party to invoke this method.
+ *
+ * @return the arrival phase number, or a negative number if terminated
+ * @throws IllegalStateException if not terminated and the number
+ * of unarrived parties would become negative
+ */
+ public int arriveAndAwaitAdvance() {
+ return awaitAdvance(arrive());
+ }
+
+ /**
+ * Awaits the phase of the barrier to advance from the given phase
+ * value, returning immediately if the current phase of the
+ * barrier is not equal to the given phase value or this barrier
+ * is terminated. It is an unenforced usage error for an
+ * unregistered party to invoke this method.
+ *
+ * @param phase an arrival phase number, or negative value if
+ * terminated; this argument is normally the value returned by a
+ * previous call to {@code arrive} or its variants
+ * @return the next arrival phase number, or a negative value
+ * if terminated or argument is negative
+ */
+ public int awaitAdvance(int phase) {
+ if (phase < 0)
+ return phase;
+ long s = getReconciledState();
+ int p = phaseOf(s);
+ if (p != phase)
+ return p;
+ if (unarrivedOf(s) == 0 && parent != null)
+ parent.awaitAdvance(phase);
+ // Fall here even if parent waited, to reconcile and help release
+ return untimedWait(phase);
+ }
+
+ /**
+ * Awaits the phase of the barrier to advance from the given phase
+ * value, throwing {@code InterruptedException} if interrupted
+ * while waiting, or returning immediately if the current phase of
+ * the barrier is not equal to the given phase value or this
+ * barrier is terminated. It is an unenforced usage error for an
+ * unregistered party to invoke this method.
+ *
+ * @param phase an arrival phase number, or negative value if
+ * terminated; this argument is normally the value returned by a
+ * previous call to {@code arrive} or its variants
+ * @return the next arrival phase number, or a negative value
+ * if terminated or argument is negative
+ * @throws InterruptedException if thread interrupted while waiting
+ */
+ public int awaitAdvanceInterruptibly(int phase)
+ throws InterruptedException {
+ if (phase < 0)
+ return phase;
+ long s = getReconciledState();
+ int p = phaseOf(s);
+ if (p != phase)
+ return p;
+ if (unarrivedOf(s) == 0 && parent != null)
+ parent.awaitAdvanceInterruptibly(phase);
+ return interruptibleWait(phase);
+ }
+
+ /**
+ * Awaits the phase of the barrier to advance from the given phase
+ * value or the given timeout to elapse, throwing {@code
+ * InterruptedException} if interrupted while waiting, or
+ * returning immediately if the current phase of the barrier is
+ * not equal to the given phase value or this barrier is
+ * terminated. It is an unenforced usage error for an
+ * unregistered party to invoke this method.
+ *
+ * @param phase an arrival phase number, or negative value if
+ * terminated; this argument is normally the value returned by a
+ * previous call to {@code arrive} or its variants
+ * @param timeout how long to wait before giving up, in units of
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return the next arrival phase number, or a negative value
+ * if terminated or argument is negative
+ * @throws InterruptedException if thread interrupted while waiting
+ * @throws TimeoutException if timed out while waiting
+ */
+ public int awaitAdvanceInterruptibly(int phase,
+ long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (phase < 0)
+ return phase;
+ long s = getReconciledState();
+ int p = phaseOf(s);
+ if (p != phase)
+ return p;
+ if (unarrivedOf(s) == 0 && parent != null)
+ parent.awaitAdvanceInterruptibly(phase, timeout, unit);
+ return timedWait(phase, unit.toNanos(timeout));
+ }
+
+ /**
+ * Forces this barrier to enter termination state. Counts of
+ * arrived and registered parties are unaffected. If this phaser
+ * has a parent, it too is terminated. This method may be useful
+ * for coordinating recovery after one or more tasks encounter
+ * unexpected exceptions.
+ */
+ public void forceTermination() {
+ for (;;) {
+ long s = getReconciledState();
+ int phase = phaseOf(s);
+ int parties = partiesOf(s);
+ int unarrived = unarrivedOf(s);
+ if (phase < 0 ||
+ casState(s, stateFor(-1, parties, unarrived))) {
+ releaseWaiters(0);
+ releaseWaiters(1);
+ if (parent != null)
+ parent.forceTermination();
+ return;
+ }
+ }
+ }
+
+ /**
+ * Returns the current phase number. The maximum phase number is
+ * {@code Integer.MAX_VALUE}, after which it restarts at
+ * zero. Upon termination, the phase number is negative.
+ *
+ * @return the phase number, or a negative value if terminated
+ */
+ public final int getPhase() {
+ return phaseOf(getReconciledState());
+ }
+
+ /**
+ * Returns the number of parties registered at this barrier.
+ *
+ * @return the number of parties
+ */
+ public int getRegisteredParties() {
+ return partiesOf(state);
+ }
+
+ /**
+ * Returns the number of registered parties that have arrived at
+ * the current phase of this barrier.
+ *
+ * @return the number of arrived parties
+ */
+ public int getArrivedParties() {
+ return arrivedOf(state);
+ }
+
+ /**
+ * Returns the number of registered parties that have not yet
+ * arrived at the current phase of this barrier.
+ *
+ * @return the number of unarrived parties
+ */
+ public int getUnarrivedParties() {
+ return unarrivedOf(state);
+ }
+
+ /**
+ * Returns the parent of this phaser, or {@code null} if none.
+ *
+ * @return the parent of this phaser, or {@code null} if none
+ */
+ public Phaser getParent() {
+ return parent;
+ }
+
+ /**
+ * Returns the root ancestor of this phaser, which is the same as
+ * this phaser if it has no parent.
+ *
+ * @return the root ancestor of this phaser
+ */
+ public Phaser getRoot() {
+ return root;
+ }
+
+ /**
+ * Returns {@code true} if this barrier has been terminated.
+ *
+ * @return {@code true} if this barrier has been terminated
+ */
+ public boolean isTerminated() {
+ return getPhase() < 0;
+ }
+
+ /**
+ * Overridable method to perform an action upon impending phase
+ * advance, and to control termination. This method is invoked
+ * upon arrival of the party tripping the barrier (when all other
+ * waiting parties are dormant). If this method returns {@code
+ * true}, then, rather than advance the phase number, this barrier
+ * will be set to a final termination state, and subsequent calls
+ * to {@link #isTerminated} will return true. Any (unchecked)
+ * Exception or Error thrown by an invocation of this method is
+ * propagated to the party attempting to trip the barrier, in
+ * which case no advance occurs.
+ *
+ * <p>The arguments to this method provide the state of the phaser
+ * prevailing for the current transition. (When called from within
+ * an implementation of {@code onAdvance} the values returned by
+ * methods such as {@code getPhase} may or may not reliably
+ * indicate the state to which this transition applies.)
+ *
+ * <p>The default version returns {@code true} when the number of
+ * registered parties is zero. Normally, overrides that arrange
+ * termination for other reasons should also preserve this
+ * property.
+ *
+ * <p>You may override this method to perform an action with side
+ * effects visible to participating tasks, but it is only sensible
+ * to do so in designs where all parties register before any
+ * arrive, and all {@link #awaitAdvance} at each phase.
+ * Otherwise, you cannot ensure lack of interference from other
+ * parties during the invocation of this method. Additionally,
+ * method {@code onAdvance} may be invoked more than once per
+ * transition if registrations are intermixed with arrivals.
+ *
+ * @param phase the phase number on entering the barrier
+ * @param registeredParties the current number of registered parties
+ * @return {@code true} if this barrier should terminate
+ */
+ protected boolean onAdvance(int phase, int registeredParties) {
+ return registeredParties <= 0;
+ }
+
+ /**
+ * Returns a string identifying this phaser, as well as its
+ * state. The state, in brackets, includes the String {@code
+ * "phase = "} followed by the phase number, {@code "parties = "}
+ * followed by the number of registered parties, and {@code
+ * "arrived = "} followed by the number of arrived parties.
+ *
+ * @return a string identifying this barrier, as well as its state
+ */
+ public String toString() {
+ long s = getReconciledState();
+ return super.toString() +
+ "[phase = " + phaseOf(s) +
+ " parties = " + partiesOf(s) +
+ " arrived = " + arrivedOf(s) + "]";
+ }
+
+ // methods for waiting
+
+ /**
+ * Wait nodes for Treiber stack representing wait queue
+ */
+ static final class QNode implements ForkJoinPool.ManagedBlocker {
+ final Phaser phaser;
+ final int phase;
+ final long startTime;
+ final long nanos;
+ final boolean timed;
+ final boolean interruptible;
+ volatile boolean wasInterrupted = false;
+ volatile Thread thread; // nulled to cancel wait
+ QNode next;
+ QNode(Phaser phaser, int phase, boolean interruptible,
+ boolean timed, long startTime, long nanos) {
+ this.phaser = phaser;
+ this.phase = phase;
+ this.timed = timed;
+ this.interruptible = interruptible;
+ this.startTime = startTime;
+ this.nanos = nanos;
+ thread = Thread.currentThread();
+ }
+ public boolean isReleasable() {
+ return (thread == null ||
+ phaser.getPhase() != phase ||
+ (interruptible && wasInterrupted) ||
+ (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
+ }
+ public boolean block() {
+ if (Thread.interrupted()) {
+ wasInterrupted = true;
+ if (interruptible)
+ return true;
+ }
+ if (!timed)
+ LockSupport.park(this);
+ else {
+ long waitTime = nanos - (System.nanoTime() - startTime);
+ if (waitTime <= 0)
+ return true;
+ LockSupport.parkNanos(this, waitTime);
+ }
+ return isReleasable();
+ }
+ void signal() {
+ Thread t = thread;
+ if (t != null) {
+ thread = null;
+ LockSupport.unpark(t);
+ }
+ }
+ boolean doWait() {
+ if (thread != null) {
+ try {
+ ForkJoinPool.managedBlock(this, false);
+ } catch (InterruptedException ie) {
+ }
+ }
+ return wasInterrupted;
+ }
+
+ }
+
+ /**
+ * Removes and signals waiting threads from wait queue.
+ */
+ private void releaseWaiters(int phase) {
+ AtomicReference<QNode> head = queueFor(phase);
+ QNode q;
+ while ((q = head.get()) != null) {
+ if (head.compareAndSet(q, q.next))
+ q.signal();
+ }
+ }
+
+ /**
+ * Tries to enqueue given node in the appropriate wait queue.
+ *
+ * @return true if successful
+ */
+ private boolean tryEnqueue(QNode node) {
+ AtomicReference<QNode> head = queueFor(node.phase);
+ return head.compareAndSet(node.next = head.get(), node);
+ }
+
+ /**
+ * Enqueues node and waits unless aborted or signalled.
+ *
+ * @return current phase
+ */
+ private int untimedWait(int phase) {
+ QNode node = null;
+ boolean queued = false;
+ boolean interrupted = false;
+ int p;
+ while ((p = getPhase()) == phase) {
+ if (Thread.interrupted())
+ interrupted = true;
+ else if (node == null)
+ node = new QNode(this, phase, false, false, 0, 0);
+ else if (!queued)
+ queued = tryEnqueue(node);
+ else
+ interrupted = node.doWait();
+ }
+ if (node != null)
+ node.thread = null;
+ releaseWaiters(phase);
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ return p;
+ }
+
+ /**
+ * Interruptible version
+ * @return current phase
+ */
+ private int interruptibleWait(int phase) throws InterruptedException {
+ QNode node = null;
+ boolean queued = false;
+ boolean interrupted = false;
+ int p;
+ while ((p = getPhase()) == phase && !interrupted) {
+ if (Thread.interrupted())
+ interrupted = true;
+ else if (node == null)
+ node = new QNode(this, phase, true, false, 0, 0);
+ else if (!queued)
+ queued = tryEnqueue(node);
+ else
+ interrupted = node.doWait();
+ }
+ if (node != null)
+ node.thread = null;
+ if (p != phase || (p = getPhase()) != phase)
+ releaseWaiters(phase);
+ if (interrupted)
+ throw new InterruptedException();
+ return p;
+ }
+
+ /**
+ * Timeout version.
+ * @return current phase
+ */
+ private int timedWait(int phase, long nanos)
+ throws InterruptedException, TimeoutException {
+ long startTime = System.nanoTime();
+ QNode node = null;
+ boolean queued = false;
+ boolean interrupted = false;
+ int p;
+ while ((p = getPhase()) == phase && !interrupted) {
+ if (Thread.interrupted())
+ interrupted = true;
+ else if (nanos - (System.nanoTime() - startTime) <= 0)
+ break;
+ else if (node == null)
+ node = new QNode(this, phase, true, true, startTime, nanos);
+ else if (!queued)
+ queued = tryEnqueue(node);
+ else
+ interrupted = node.doWait();
+ }
+ if (node != null)
+ node.thread = null;
+ if (p != phase || (p = getPhase()) != phase)
+ releaseWaiters(phase);
+ if (interrupted)
+ throw new InterruptedException();
+ if (p == phase)
+ throw new TimeoutException();
+ return p;
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long stateOffset =
+ objectFieldOffset("state", Phaser.class);
+
+ private final boolean casState(long cmp, long val) {
+ return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
+ }
+
+ private static long objectFieldOffset(String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/RecursiveAction.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,179 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * A recursive resultless {@link ForkJoinTask}. This class
+ * establishes conventions to parameterize resultless actions as
+ * {@code Void} {@code ForkJoinTask}s. Because {@code null} is the
+ * only valid value of type {@code Void}, methods such as join always
+ * return {@code null} upon completion.
+ *
+ * <p><b>Sample Usages.</b> Here is a sketch of a ForkJoin sort that
+ * sorts a given {@code long[]} array:
+ *
+ * <pre> {@code
+ * class SortTask extends RecursiveAction {
+ * final long[] array; final int lo; final int hi;
+ * SortTask(long[] array, int lo, int hi) {
+ * this.array = array; this.lo = lo; this.hi = hi;
+ * }
+ * protected void compute() {
+ * if (hi - lo < THRESHOLD)
+ * sequentiallySort(array, lo, hi);
+ * else {
+ * int mid = (lo + hi) >>> 1;
+ * invokeAll(new SortTask(array, lo, mid),
+ * new SortTask(array, mid, hi));
+ * merge(array, lo, hi);
+ * }
+ * }
+ * }}</pre>
+ *
+ * You could then sort {@code anArray} by creating {@code new
+ * SortTask(anArray, 0, anArray.length-1) } and invoking it in a
+ * ForkJoinPool. As a more concrete simple example, the following
+ * task increments each element of an array:
+ * <pre> {@code
+ * class IncrementTask extends RecursiveAction {
+ * final long[] array; final int lo; final int hi;
+ * IncrementTask(long[] array, int lo, int hi) {
+ * this.array = array; this.lo = lo; this.hi = hi;
+ * }
+ * protected void compute() {
+ * if (hi - lo < THRESHOLD) {
+ * for (int i = lo; i < hi; ++i)
+ * array[i]++;
+ * }
+ * else {
+ * int mid = (lo + hi) >>> 1;
+ * invokeAll(new IncrementTask(array, lo, mid),
+ * new IncrementTask(array, mid, hi));
+ * }
+ * }
+ * }}</pre>
+ *
+ * <p>The following example illustrates some refinements and idioms
+ * that may lead to better performance: RecursiveActions need not be
+ * fully recursive, so long as they maintain the basic
+ * divide-and-conquer approach. Here is a class that sums the squares
+ * of each element of a double array, by subdividing out only the
+ * right-hand-sides of repeated divisions by two, and keeping track of
+ * them with a chain of {@code next} references. It uses a dynamic
+ * threshold based on method {@code getSurplusQueuedTaskCount}, but
+ * counterbalances potential excess partitioning by directly
+ * performing leaf actions on unstolen tasks rather than further
+ * subdividing.
+ *
+ * <pre> {@code
+ * double sumOfSquares(ForkJoinPool pool, double[] array) {
+ * int n = array.length;
+ * Applyer a = new Applyer(array, 0, n, null);
+ * pool.invoke(a);
+ * return a.result;
+ * }
+ *
+ * class Applyer extends RecursiveAction {
+ * final double[] array;
+ * final int lo, hi;
+ * double result;
+ * Applyer next; // keeps track of right-hand-side tasks
+ * Applyer(double[] array, int lo, int hi, Applyer next) {
+ * this.array = array; this.lo = lo; this.hi = hi;
+ * this.next = next;
+ * }
+ *
+ * double atLeaf(int l, int h) {
+ * double sum = 0;
+ * for (int i = l; i < h; ++i) // perform leftmost base step
+ * sum += array[i] * array[i];
+ * return sum;
+ * }
+ *
+ * protected void compute() {
+ * int l = lo;
+ * int h = hi;
+ * Applyer right = null;
+ * while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
+ * int mid = (l + h) >>> 1;
+ * right = new Applyer(array, mid, h, right);
+ * right.fork();
+ * h = mid;
+ * }
+ * double sum = atLeaf(l, h);
+ * while (right != null) {
+ * if (right.tryUnfork()) // directly calculate if not stolen
+ * sum += right.atLeaf(right.lo, right.hi);
+ * else {
+ * right.helpJoin();
+ * sum += right.result;
+ * }
+ * right = right.next;
+ * }
+ * result = sum;
+ * }
+ * }}</pre>
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public abstract class RecursiveAction extends ForkJoinTask<Void> {
+ private static final long serialVersionUID = 5232453952276485070L;
+
+ /**
+ * The main computation performed by this task.
+ */
+ protected abstract void compute();
+
+ /**
+ * Always returns null.
+ */
+ public final Void getRawResult() { return null; }
+
+ /**
+ * Requires null completion value.
+ */
+ protected final void setRawResult(Void mustBeNull) { }
+
+ /**
+ * Implements execution conventions for RecursiveActions.
+ */
+ protected final boolean exec() {
+ compute();
+ return true;
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/RecursiveTask.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,97 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * A recursive result-bearing {@link ForkJoinTask}.
+ *
+ * <p>For a classic example, here is a task computing Fibonacci numbers:
+ *
+ * <pre> {@code
+ * class Fibonacci extends RecursiveTask<Integer> {
+ * final int n;
+ * Fibonacci(int n) { this.n = n; }
+ * Integer compute() {
+ * if (n <= 1)
+ * return n;
+ * Fibonacci f1 = new Fibonacci(n - 1);
+ * f1.fork();
+ * Fibonacci f2 = new Fibonacci(n - 2);
+ * return f2.compute() + f1.join();
+ * }
+ * }}</pre>
+ *
+ * However, besides being a dumb way to compute Fibonacci functions
+ * (there is a simple fast linear algorithm that you'd use in
+ * practice), this is likely to perform poorly because the smallest
+ * subtasks are too small to be worthwhile splitting up. Instead, as
+ * is the case for nearly all fork/join applications, you'd pick some
+ * minimum granularity size (for example 10 here) for which you always
+ * sequentially solve rather than subdividing.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
+ private static final long serialVersionUID = 5232453952276485270L;
+
+ /**
+ * The result of the computation.
+ */
+ V result;
+
+ /**
+ * The main computation performed by this task.
+ */
+ protected abstract V compute();
+
+ public final V getRawResult() {
+ return result;
+ }
+
+ protected final void setRawResult(V value) {
+ result = value;
+ }
+
+ /**
+ * Implements execution conventions for RecursiveTask.
+ */
+ protected final boolean exec() {
+ result = compute();
+ return true;
+ }
+
+}
--- a/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java Mon Nov 02 22:23:50 2009 -0800
@@ -61,6 +61,14 @@
* causes tasks to be immediately removed from the work queue at
* time of cancellation.
*
+ * <p>Successive executions of a task scheduled via
+ * <code>scheduleAtFixedRate</code> or
+ * <code>scheduleWithFixedDelay</code> do not overlap. While different
+ * executions may be performed by different threads, the effects of
+ * prior executions <a
+ * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * those of subsequent ones.
+ *
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* of the inherited tuning methods are not useful for it. In
* particular, because it acts as a fixed-sized pool using
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/ThreadLocalRandom.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,228 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.Random;
+
+/**
+ * A random number generator isolated to the current thread. Like the
+ * global {@link java.util.Random} generator used by the {@link
+ * java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
+ * with an internally generated seed that may not otherwise be
+ * modified. When applicable, use of {@code ThreadLocalRandom} rather
+ * than shared {@code Random} objects in concurrent programs will
+ * typically encounter much less overhead and contention. Use of
+ * {@code ThreadLocalRandom} is particularly appropriate when multiple
+ * tasks (for example, each a {@link ForkJoinTask}) use random numbers
+ * in parallel in thread pools.
+ *
+ * <p>Usages of this class should typically be of the form:
+ * {@code ThreadLocalRandom.current().nextX(...)} (where
+ * {@code X} is {@code Int}, {@code Long}, etc).
+ * When all usages are of this form, it is never possible to
+ * accidently share a {@code ThreadLocalRandom} across multiple threads.
+ *
+ * <p>This class also provides additional commonly used bounded random
+ * generation methods.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ */
+public class ThreadLocalRandom extends Random {
+ // same constants as Random, but must be redeclared because private
+ private final static long multiplier = 0x5DEECE66DL;
+ private final static long addend = 0xBL;
+ private final static long mask = (1L << 48) - 1;
+
+ /**
+ * The random seed. We can't use super.seed.
+ */
+ private long rnd;
+
+ /**
+ * Initialization flag to permit the first and only allowed call
+ * to setSeed (inside Random constructor) to succeed. We can't
+ * allow others since it would cause setting seed in one part of a
+ * program to unintentionally impact other usages by the thread.
+ */
+ boolean initialized;
+
+ // Padding to help avoid memory contention among seed updates in
+ // different TLRs in the common case that they are located near
+ // each other.
+ private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+ /**
+ * The actual ThreadLocal
+ */
+ private static final ThreadLocal<ThreadLocalRandom> localRandom =
+ new ThreadLocal<ThreadLocalRandom>() {
+ protected ThreadLocalRandom initialValue() {
+ return new ThreadLocalRandom();
+ }
+ };
+
+
+ /**
+ * Constructor called only by localRandom.initialValue.
+ * We rely on the fact that the superclass no-arg constructor
+ * invokes setSeed exactly once to initialize.
+ */
+ ThreadLocalRandom() {
+ super();
+ }
+
+ /**
+ * Returns the current thread's {@code ThreadLocalRandom}.
+ *
+ * @return the current thread's {@code ThreadLocalRandom}
+ */
+ public static ThreadLocalRandom current() {
+ return localRandom.get();
+ }
+
+ /**
+ * Throws {@code UnsupportedOperationException}. Setting seeds in
+ * this generator is not supported.
+ *
+ * @throws UnsupportedOperationException always
+ */
+ public void setSeed(long seed) {
+ if (initialized)
+ throw new UnsupportedOperationException();
+ initialized = true;
+ rnd = (seed ^ multiplier) & mask;
+ }
+
+ protected int next(int bits) {
+ rnd = (rnd * multiplier + addend) & mask;
+ return (int) (rnd >>> (48-bits));
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed value between the
+ * given least value (inclusive) and bound (exclusive).
+ *
+ * @param least the least value returned
+ * @param bound the upper bound (exclusive)
+ * @throws IllegalArgumentException if least greater than or equal
+ * to bound
+ * @return the next value
+ */
+ public int nextInt(int least, int bound) {
+ if (least >= bound)
+ throw new IllegalArgumentException();
+ return nextInt(bound - least) + least;
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed value
+ * between 0 (inclusive) and the specified value (exclusive).
+ *
+ * @param n the bound on the random number to be returned. Must be
+ * positive.
+ * @return the next value
+ * @throws IllegalArgumentException if n is not positive
+ */
+ public long nextLong(long n) {
+ if (n <= 0)
+ throw new IllegalArgumentException("n must be positive");
+ // Divide n by two until small enough for nextInt. On each
+ // iteration (at most 31 of them but usually much less),
+ // randomly choose both whether to include high bit in result
+ // (offset) and whether to continue with the lower vs upper
+ // half (which makes a difference only if odd).
+ long offset = 0;
+ while (n >= Integer.MAX_VALUE) {
+ int bits = next(2);
+ long half = n >>> 1;
+ long nextn = ((bits & 2) == 0) ? half : n - half;
+ if ((bits & 1) == 0)
+ offset += n - nextn;
+ n = nextn;
+ }
+ return offset + nextInt((int) n);
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed value between the
+ * given least value (inclusive) and bound (exclusive).
+ *
+ * @param least the least value returned
+ * @param bound the upper bound (exclusive)
+ * @return the next value
+ * @throws IllegalArgumentException if least greater than or equal
+ * to bound
+ */
+ public long nextLong(long least, long bound) {
+ if (least >= bound)
+ throw new IllegalArgumentException();
+ return nextLong(bound - least) + least;
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed {@code double} value
+ * between 0 (inclusive) and the specified value (exclusive).
+ *
+ * @param n the bound on the random number to be returned. Must be
+ * positive.
+ * @return the next value
+ * @throws IllegalArgumentException if n is not positive
+ */
+ public double nextDouble(double n) {
+ if (n <= 0)
+ throw new IllegalArgumentException("n must be positive");
+ return nextDouble() * n;
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed value between the
+ * given least value (inclusive) and bound (exclusive).
+ *
+ * @param least the least value returned
+ * @param bound the upper bound (exclusive)
+ * @return the next value
+ * @throws IllegalArgumentException if least greater than or equal
+ * to bound
+ */
+ public double nextDouble(double least, double bound) {
+ if (least >= bound)
+ throw new IllegalArgumentException();
+ return nextDouble() * (bound - least) + least;
+ }
+
+ private static final long serialVersionUID = -5851777807851030925L;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/concurrent/TransferQueue.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,161 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * A {@link BlockingQueue} in which producers may wait for consumers
+ * to receive elements. A {@code TransferQueue} may be useful for
+ * example in message passing applications in which producers
+ * sometimes (using method {@link #transfer}) await receipt of
+ * elements by consumers invoking {@code take} or {@code poll}, while
+ * at other times enqueue elements (via method {@code put}) without
+ * waiting for receipt.
+ * {@linkplain #tryTransfer(Object) Non-blocking} and
+ * {@linkplain #tryTransfer(Object,long,TimeUnit) time-out} versions of
+ * {@code tryTransfer} are also available.
+ * A {@code TransferQueue} may also be queried, via {@link
+ * #hasWaitingConsumer}, whether there are any threads waiting for
+ * items, which is a converse analogy to a {@code peek} operation.
+ *
+ * <p>Like other blocking queues, a {@code TransferQueue} may be
+ * capacity bounded. If so, an attempted transfer operation may
+ * initially block waiting for available space, and/or subsequently
+ * block waiting for reception by a consumer. Note that in a queue
+ * with zero capacity, such as {@link SynchronousQueue}, {@code put}
+ * and {@code transfer} are effectively synonymous.
+ *
+ * <p>This interface is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public interface TransferQueue<E> extends BlockingQueue<E> {
+ /**
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @param e the element to transfer
+ * @return {@code true} if the element was transferred, else
+ * {@code false}
+ * @throws ClassCastException if the class of the specified element
+ * prevents it from being added to this queue
+ * @throws NullPointerException if the specified element is null
+ * @throws IllegalArgumentException if some property of the specified
+ * element prevents it from being added to this queue
+ */
+ boolean tryTransfer(E e);
+
+ /**
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else waits until the element is received by a consumer.
+ *
+ * @param e the element to transfer
+ * @throws InterruptedException if interrupted while waiting,
+ * in which case the element is not left enqueued
+ * @throws ClassCastException if the class of the specified element
+ * prevents it from being added to this queue
+ * @throws NullPointerException if the specified element is null
+ * @throws IllegalArgumentException if some property of the specified
+ * element prevents it from being added to this queue
+ */
+ void transfer(E e) throws InterruptedException;
+
+ /**
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ * <p>More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @param e the element to transfer
+ * @param timeout how long to wait before giving up, in units of
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return {@code true} if successful, or {@code false} if
+ * the specified waiting time elapses before completion,
+ * in which case the element is not left enqueued
+ * @throws InterruptedException if interrupted while waiting,
+ * in which case the element is not left enqueued
+ * @throws ClassCastException if the class of the specified element
+ * prevents it from being added to this queue
+ * @throws NullPointerException if the specified element is null
+ * @throws IllegalArgumentException if some property of the specified
+ * element prevents it from being added to this queue
+ */
+ boolean tryTransfer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Returns {@code true} if there is at least one consumer waiting
+ * to receive an element via {@link #take} or
+ * timed {@link #poll(long,TimeUnit) poll}.
+ * The return value represents a momentary state of affairs.
+ *
+ * @return {@code true} if there is at least one waiting consumer
+ */
+ boolean hasWaitingConsumer();
+
+ /**
+ * Returns an estimate of the number of consumers waiting to
+ * receive elements via {@link #take} or timed
+ * {@link #poll(long,TimeUnit) poll}. The return value is an
+ * approximation of a momentary state of affairs, that may be
+ * inaccurate if consumers have completed or given up waiting.
+ * The value may be useful for monitoring and heuristics, but
+ * not for synchronization control. Implementations of this
+ * method are likely to be noticeably slower than those for
+ * {@link #hasWaitingConsumer}.
+ *
+ * @return the number of consumers waiting to receive elements
+ */
+ int getWaitingConsumerCount();
+}
--- a/jdk/src/share/classes/java/util/concurrent/locks/Condition.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/locks/Condition.java Mon Nov 02 22:23:50 2009 -0800
@@ -170,8 +170,8 @@
* <p>As interruption generally implies cancellation, and checks for
* interruption are often infrequent, an implementation can favor responding
* to an interrupt over normal method return. This is true even if it can be
- * shown that the interrupt occurred after another action may have unblocked
- * the thread. An implementation should document this behavior.
+ * shown that the interrupt occurred after another action that may have
+ * unblocked the thread. An implementation should document this behavior.
*
* @since 1.5
* @author Doug Lea
--- a/jdk/src/share/classes/java/util/concurrent/package-info.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/package-info.java Mon Nov 02 22:23:50 2009 -0800
@@ -92,6 +92,13 @@
* assists in coordinating the processing of groups of
* asynchronous tasks.
*
+ * <p>Class {@link java.util.concurrent.ForkJoinPool} provides an
+ * Executor primarily designed for processing instances of {@link
+ * java.util.concurrent.ForkJoinTask} and its subclasses. These
+ * classes employ a work-stealing scheduler that attains high
+ * throughput for tasks conforming to restrictions that often hold in
+ * computation-intensive parallel processing.
+ *
* <h2>Queues</h2>
*
* The {@link java.util.concurrent.ConcurrentLinkedQueue} class
@@ -110,6 +117,12 @@
* for producer-consumer, messaging, parallel tasking, and
* related concurrent designs.
*
+ * <p> Extended interface {@link java.util.concurrent.TransferQueue},
+ * and implementation {@link java.util.concurrent.LinkedTransferQueue}
+ * introduce a synchronous {@code transfer} method (along with related
+ * features) in which a producer may optionally block awaiting its
+ * consumer.
+ *
* <p>The {@link java.util.concurrent.BlockingDeque} interface
* extends {@code BlockingQueue} to support both FIFO and LIFO
* (stack-based) operations.
@@ -136,15 +149,28 @@
*
* <h2>Synchronizers</h2>
*
- * Four classes aid common special-purpose synchronization idioms.
- * {@link java.util.concurrent.Semaphore} is a classic concurrency tool.
- * {@link java.util.concurrent.CountDownLatch} is a very simple yet very
- * common utility for blocking until a given number of signals, events,
- * or conditions hold. A {@link java.util.concurrent.CyclicBarrier} is a
- * resettable multiway synchronization point useful in some styles of
- * parallel programming. An {@link java.util.concurrent.Exchanger} allows
- * two threads to exchange objects at a rendezvous point, and is useful
- * in several pipeline designs.
+ * Five classes aid common special-purpose synchronization idioms.
+ * <ul>
+ *
+ * <li>{@link java.util.concurrent.Semaphore} is a classic concurrency tool.
+ *
+ * <li>{@link java.util.concurrent.CountDownLatch} is a very simple yet
+ * very common utility for blocking until a given number of signals,
+ * events, or conditions hold.
+ *
+ * <li>A {@link java.util.concurrent.CyclicBarrier} is a resettable
+ * multiway synchronization point useful in some styles of parallel
+ * programming.
+ *
+ * <li>A {@link java.util.concurrent.Phaser} provides
+ * a more flexible form of barrier that may be used to control phased
+ * computation among multiple threads.
+ *
+ * <li>An {@link java.util.concurrent.Exchanger} allows two threads to
+ * exchange objects at a rendezvous point, and is useful in several
+ * pipeline designs.
+ *
+ * </ul>
*
* <h2>Concurrent Collections</h2>
*
@@ -259,7 +285,8 @@
* in each thread <i>happen-before</i> those subsequent to the
* corresponding {@code exchange()} in another thread.
*
- * <li>Actions prior to calling {@code CyclicBarrier.await}
+ * <li>Actions prior to calling {@code CyclicBarrier.await} and
+ * {@code Phaser.awaitAdvance} (as well as its variants)
* <i>happen-before</i> actions performed by the barrier action, and
* actions performed by the barrier action <i>happen-before</i> actions
* subsequent to a successful return from the corresponding {@code await}
--- a/jdk/test/java/util/Collection/BiggernYours.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/Collection/BiggernYours.java Mon Nov 02 22:23:50 2009 -0800
@@ -178,10 +178,10 @@
new ConcurrentLinkedQueue() {
public int size() {return randomize(super.size());}});
-// testCollections(
-// new LinkedTransferQueue(),
-// new LinkedTransferQueue() {
-// public int size() {return randomize(super.size());}});
+ testCollections(
+ new LinkedTransferQueue(),
+ new LinkedTransferQueue() {
+ public int size() {return randomize(super.size());}});
testCollections(
new LinkedBlockingQueue(),
--- a/jdk/test/java/util/Collection/IteratorAtEnd.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/Collection/IteratorAtEnd.java Mon Nov 02 22:23:50 2009 -0800
@@ -49,7 +49,7 @@
testCollection(new LinkedBlockingQueue());
testCollection(new ArrayBlockingQueue(100));
testCollection(new ConcurrentLinkedQueue());
-// testCollection(new LinkedTransferQueue());
+ testCollection(new LinkedTransferQueue());
testMap(new HashMap());
testMap(new Hashtable());
--- a/jdk/test/java/util/Collection/MOAT.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/Collection/MOAT.java Mon Nov 02 22:23:50 2009 -0800
@@ -76,7 +76,7 @@
testCollection(new LinkedBlockingQueue<Integer>(20));
testCollection(new LinkedBlockingDeque<Integer>(20));
testCollection(new ConcurrentLinkedQueue<Integer>());
-// testCollection(new LinkedTransferQueue<Integer>());
+ testCollection(new LinkedTransferQueue<Integer>());
testCollection(new ConcurrentSkipListSet<Integer>());
testCollection(Arrays.asList(new Integer(42)));
testCollection(Arrays.asList(1,2,3));
--- a/jdk/test/java/util/Collections/CheckedNull.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/Collections/CheckedNull.java Mon Nov 02 22:23:50 2009 -0800
@@ -52,7 +52,7 @@
testMap(Collections.checkedMap(
new HashMap<String, String>(),
- String.class, String.class));;
+ String.class, String.class));
}
ClassCastException cce(F f) {
--- a/jdk/test/java/util/Collections/RacingCollections.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/Collections/RacingCollections.java Mon Nov 02 22:23:50 2009 -0800
@@ -234,7 +234,7 @@
List<Queue<Integer>> list =
new ArrayList<Queue<Integer>>(newConcurrentDeques());
list.add(new LinkedBlockingQueue<Integer>(10));
-// list.add(new LinkedTransferQueue<Integer>());
+ list.add(new LinkedTransferQueue<Integer>());
return list;
}
--- a/jdk/test/java/util/PriorityQueue/RemoveContains.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/PriorityQueue/RemoveContains.java Mon Nov 02 22:23:50 2009 -0800
@@ -69,7 +69,7 @@
test(new ArrayBlockingQueue<String>(10));
test(new LinkedBlockingQueue<String>(10));
test(new LinkedBlockingDeque<String>(10));
-// test(new LinkedTransferQueue<String>());
+ test(new LinkedTransferQueue<String>());
test(new ArrayDeque<String>(10));
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
--- a/jdk/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -119,12 +119,36 @@
}
}
+ static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
+ LTQasSQ() { super(); }
+ public void put(T x) {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
+ static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
+ HalfSyncLTQ() { super(); }
+ public void put(T x) {
+ if (ThreadLocalRandom.current().nextBoolean())
+ super.put(x);
+ else {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
static void oneTest(int pairs, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
-// oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
+ oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
+ oneRun(new LTQasSQ<Integer>(), pairs, iters);
+ oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
/* PriorityBlockingQueue is unbounded
--- a/jdk/test/java/util/concurrent/BlockingQueue/LastElement.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/LastElement.java Mon Nov 02 22:23:50 2009 -0800
@@ -37,7 +37,7 @@
testQueue(new LinkedBlockingDeque<Integer>());
testQueue(new ArrayBlockingQueue<Integer>(10, true));
testQueue(new ArrayBlockingQueue<Integer>(10, false));
-// testQueue(new LinkedTransferQueue<Integer>());
+ testQueue(new LinkedTransferQueue<Integer>());
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new Exception("Some tests failed");
--- a/jdk/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -87,11 +87,35 @@
throw new Error();
}
+ static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
+ LTQasSQ() { super(); }
+ public void put(T x) {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
+ static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
+ HalfSyncLTQ() { super(); }
+ public void put(T x) {
+ if (ThreadLocalRandom.current().nextBoolean())
+ super.put(x);
+ else {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
static void oneTest(int producers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
-// oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
+ oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
+ oneRun(new LTQasSQ<Integer>(), producers, iters);
+ oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
// Don't run PBQ since can legitimately run out of memory
// if (print)
--- a/jdk/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -63,12 +63,11 @@
test(new LinkedBlockingDeque());
test(new LinkedBlockingDeque(2000));
test(new ArrayBlockingQueue(2000));
-// test(new LinkedTransferQueue());
+ test(new LinkedTransferQueue());
}
Random getRandom() {
- return new Random();
- // return ThreadLocalRandom.current();
+ return ThreadLocalRandom.current();
}
void test(final BlockingQueue q) throws Throwable {
--- a/jdk/test/java/util/concurrent/BlockingQueue/PollMemoryLeak.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/PollMemoryLeak.java Mon Nov 02 22:23:50 2009 -0800
@@ -46,7 +46,7 @@
public static void main(String[] args) throws InterruptedException {
final BlockingQueue[] qs = {
new LinkedBlockingQueue(10),
-// new LinkedTransferQueue(),
+ new LinkedTransferQueue(),
new ArrayBlockingQueue(10),
new SynchronousQueue(),
new SynchronousQueue(true),
--- a/jdk/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -87,11 +87,35 @@
throw new Error();
}
+ static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
+ LTQasSQ() { super(); }
+ public void put(T x) {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
+ static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
+ HalfSyncLTQ() { super(); }
+ public void put(T x) {
+ if (ThreadLocalRandom.current().nextBoolean())
+ super.put(x);
+ else {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
static void oneTest(int pairs, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
-// oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
+ oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
+ oneRun(new LTQasSQ<Integer>(), pairs, iters);
+ oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
oneRun(new SynchronousQueue<Integer>(), pairs, iters);
--- a/jdk/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -73,11 +73,35 @@
throw new Error();
}
+ static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
+ LTQasSQ() { super(); }
+ public void put(T x) {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
+ static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
+ HalfSyncLTQ() { super(); }
+ public void put(T x) {
+ if (ThreadLocalRandom.current().nextBoolean())
+ super.put(x);
+ else {
+ try { super.transfer(x); }
+ catch (InterruptedException ex) { throw new Error(); }
+ }
+ }
+ private final static long serialVersionUID = 42;
+ }
+
static void oneTest(int consumers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
-// oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
+ oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
+ oneRun(new LTQasSQ<Integer>(), consumers, iters);
+ oneRun(new HalfSyncLTQ<Integer>(), consumers, iters);
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
oneRun(new SynchronousQueue<Integer>(), consumers, iters);
if (print)
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -60,7 +60,7 @@
//queues.add(new ArrayBlockingQueue<Integer>(count, true));
queues.add(new LinkedBlockingQueue<Integer>());
queues.add(new LinkedBlockingDeque<Integer>());
-// queues.add(new LinkedTransferQueue<Integer>());
+ queues.add(new LinkedTransferQueue<Integer>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/GCRetention.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/GCRetention.java Mon Nov 02 22:23:50 2009 -0800
@@ -43,7 +43,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
-// import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.LinkedList;
import java.util.PriorityQueue;
@@ -70,7 +70,7 @@
queues.add(new PriorityBlockingQueue<Boolean>());
queues.add(new PriorityQueue<Boolean>());
queues.add(new LinkedList<Boolean>());
-// queues.add(new LinkedTransferQueue<Boolean>());
+ queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java Mon Nov 02 22:23:50 2009 -0800
@@ -49,7 +49,7 @@
test(new LinkedBlockingDeque());
test(new LinkedBlockingDeque(20));
test(new ConcurrentLinkedQueue());
-// test(new LinkedTransferQueue());
+ test(new LinkedTransferQueue());
// Other concurrent queues (e.g. ArrayBlockingQueue) do not
// currently have weakly consistent iterators.
// test(new ArrayBlockingQueue(20));
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -56,12 +56,11 @@
testQueue(new ArrayBlockingQueue(10));
testQueue(new PriorityBlockingQueue(10));
testQueue(new ConcurrentLinkedQueue());
-// testQueue(new LinkedTransferQueue());
+ testQueue(new LinkedTransferQueue());
}
Random getRandom() {
- return new Random();
- // return ThreadLocalRandom.current();
+ return ThreadLocalRandom.current();
}
void testQueue(final Queue q) throws Throwable {
--- a/jdk/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java Mon Nov 02 22:23:50 2009 -0800
@@ -45,7 +45,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
-// import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.Collection;
@@ -67,7 +67,7 @@
queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>());
-// queues.add(new LinkedTransferQueue<Boolean>());
+ queues.add(new LinkedTransferQueue<Boolean>());
// Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/Phaser/Arrive.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,94 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6445158
+ * @summary tests for Phaser.arrive()
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Arrive {
+ void test(String[] args) throws Throwable {
+ final int n = ThreadLocalRandom.current().nextInt(1, 10);
+ final int nthreads = n*3/2;
+ final Phaser startingGate = new Phaser(nthreads);
+ final Phaser phaser = new Phaser(n);
+ final List<Thread> threads = new ArrayList<Thread>();
+ final AtomicInteger count0 = new AtomicInteger(0);
+ final AtomicInteger count1 = new AtomicInteger(0);
+ final Runnable task = new Runnable() { public void run() {
+ equal(startingGate.getPhase(), 0);
+ startingGate.arriveAndAwaitAdvance();
+ equal(startingGate.getPhase(), 1);
+ int phase = phaser.arrive();
+ if (phase == 0)
+ count0.getAndIncrement();
+ else if (phase == 1)
+ count1.getAndIncrement();
+ else
+ fail();
+ }};
+ for (int i = 0; i < nthreads; i++)
+ threads.add(new Thread(task));
+ for (Thread thread : threads)
+ thread.start();
+ for (Thread thread : threads)
+ thread.join();
+ equal(count0.get(), n);
+ equal(count1.get(), nthreads-n);
+ equal(phaser.getPhase(), 1);
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {System.err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ new Arrive().instanceMain(args);}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/Phaser/Basic.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,407 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6445158
+ * @summary Basic tests for Phaser
+ * @author Chris Hegarty
+ */
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.concurrent.TimeUnit.*;
+
+public class Basic {
+
+ private static void checkTerminated(final Phaser phaser) {
+ check(phaser.isTerminated());
+ int unarriverParties = phaser.getUnarrivedParties();
+ int registeredParties = phaser.getRegisteredParties();
+ equal(phaser.arrive(), -1);
+ equal(phaser.arriveAndDeregister(), -1);
+ equal(phaser.arriveAndAwaitAdvance(), -1);
+ equal(phaser.bulkRegister(10), -1);
+ equal(phaser.getPhase(), -1);
+ equal(phaser.register(), -1);
+ try {
+ equal(phaser.awaitAdvanceInterruptibly(0), -1);
+ equal(phaser.awaitAdvanceInterruptibly(0, 10, SECONDS), -1);
+ } catch (Exception ie) {
+ unexpected(ie);
+ }
+ equal(phaser.getUnarrivedParties(), unarriverParties);
+ equal(phaser.getRegisteredParties(), registeredParties);
+ }
+
+ private static void checkResult(Arriver a, Class<? extends Throwable> c) {
+ Throwable t = a.result();
+ if (! ((t == null && c == null) || (c != null && c.isInstance(t)))) {
+ // t.printStackTrace();
+ fail("Mismatch in thread " +
+ a.getName() + ": " +
+ t + ", " +
+ (c == null ? "<null>" : c.getName()));
+ } else {
+ pass();
+ }
+ }
+
+ //----------------------------------------------------------------
+ // Mechanism to get all test threads into "running" mode.
+ //----------------------------------------------------------------
+ private static Phaser atTheStartingGate = new Phaser(3);
+
+ private static void toTheStartingGate() {
+ try {
+ boolean expectNextPhase = false;
+ if (atTheStartingGate.getUnarrivedParties() == 1) {
+ expectNextPhase = true;
+ }
+ int phase = atTheStartingGate.getPhase();
+ equal(phase, atTheStartingGate.arrive());
+ int AwaitPhase = atTheStartingGate.awaitAdvanceInterruptibly(phase,
+ 10,
+ SECONDS);
+ if (expectNextPhase) check(AwaitPhase == (phase + 1));
+
+ pass();
+ } catch (Throwable t) {
+ unexpected(t);
+ // reset(atTheStartingGate);
+ throw new Error(t);
+ }
+ }
+
+ //----------------------------------------------------------------
+ // Convenience methods for creating threads that call arrive,
+ // awaitAdvance, arriveAndAwaitAdvance, awaitAdvanceInterruptibly
+ //----------------------------------------------------------------
+ private static abstract class Arriver extends Thread {
+ static AtomicInteger count = new AtomicInteger(1);
+
+ Arriver() {
+ this("Arriver");
+ }
+
+ Arriver(String name) {
+ this.setName(name + ":" + count.getAndIncrement());
+ this.setDaemon(true);
+ }
+
+ private volatile Throwable result;
+ private volatile int phase;
+ protected void result(Throwable result) { this.result = result; }
+ public Throwable result() { return this.result; }
+ protected void phase(int phase) { this.phase = phase; }
+ public int phase() { return this.phase; }
+ }
+
+ private static abstract class Awaiter extends Arriver {
+ Awaiter() { super("Awaiter"); }
+ Awaiter(String name) { super(name); }
+ }
+
+ private static Arriver arriver(final Phaser phaser) {
+ return new Arriver() { public void run() {
+ toTheStartingGate();
+
+ try { phase(phaser.arrive()); }
+ catch (Throwable result) { result(result); }}};
+ }
+
+ private static AtomicInteger cycleArriveAwaitAdvance = new AtomicInteger(1);
+
+ private static Awaiter awaiter(final Phaser phaser) {
+ return new Awaiter() { public void run() {
+ toTheStartingGate();
+
+ try {
+ if (cycleArriveAwaitAdvance.getAndIncrement() % 2 == 0)
+ phase(phaser.awaitAdvance(phaser.arrive()));
+ else
+ phase(phaser.arriveAndAwaitAdvance());
+ } catch (Throwable result) { result(result); }}};
+ }
+
+ private static Awaiter awaiter(final Phaser phaser,
+ final long timeout,
+ final TimeUnit unit) {
+ return new Awaiter("InterruptibleWaiter") { public void run() {
+ toTheStartingGate();
+
+ try {
+ if (timeout < 0)
+ phase(phaser.awaitAdvanceInterruptibly(phaser.arrive()));
+ else
+ phase(phaser.awaitAdvanceInterruptibly(phaser.arrive(),
+ timeout,
+ unit));
+ } catch (Throwable result) { result(result); }}};
+ }
+
+ // Returns an infinite lazy list of all possible arriver/awaiter combinations.
+ private static Iterator<Arriver> arriverIterator(final Phaser phaser) {
+ return new Iterator<Arriver>() {
+ int i = 0;
+ public boolean hasNext() { return true; }
+ public Arriver next() {
+ switch ((i++)&7) {
+ case 0: case 4:
+ return arriver(phaser);
+ case 1: case 5:
+ return awaiter(phaser);
+ case 2: case 6: case 7:
+ return awaiter(phaser, -1, SECONDS);
+ default:
+ return awaiter(phaser, 10, SECONDS); }}
+ public void remove() {throw new UnsupportedOperationException();}};
+ }
+
+ // Returns an infinite lazy list of all possible awaiter only combinations.
+ private static Iterator<Awaiter> awaiterIterator(final Phaser phaser) {
+ return new Iterator<Awaiter>() {
+ int i = 0;
+ public boolean hasNext() { return true; }
+ public Awaiter next() {
+ switch ((i++)&7) {
+ case 1: case 4: case 7:
+ return awaiter(phaser);
+ case 2: case 5:
+ return awaiter(phaser, -1, SECONDS);
+ default:
+ return awaiter(phaser, 10, SECONDS); }}
+ public void remove() {throw new UnsupportedOperationException();}};
+ }
+
+ private static void realMain(String[] args) throws Throwable {
+
+ Thread.currentThread().setName("mainThread");
+
+ //----------------------------------------------------------------
+ // Normal use
+ //----------------------------------------------------------------
+ try {
+ Phaser phaser = new Phaser(3);
+ equal(phaser.getRegisteredParties(), 3);
+ equal(phaser.getArrivedParties(), 0);
+ equal(phaser.getPhase(), 0);
+ check(phaser.getRoot().equals(phaser));
+ equal(phaser.getParent(), null);
+ check(!phaser.isTerminated());
+
+ Iterator<Arriver> arrivers = arriverIterator(phaser);
+ int phase = 0;
+ for (int i = 0; i < 10; i++) {
+ equal(phaser.getPhase(), phase++);
+ Arriver a1 = arrivers.next(); a1.start();
+ Arriver a2 = arrivers.next(); a2.start();
+ toTheStartingGate();
+ phaser.arriveAndAwaitAdvance();
+ a1.join();
+ a2.join();
+ checkResult(a1, null);
+ checkResult(a2, null);
+ check(!phaser.isTerminated());
+ equal(phaser.getRegisteredParties(), 3);
+ equal(phaser.getArrivedParties(), 0);
+ }
+ } catch (Throwable t) { unexpected(t); }
+
+ //----------------------------------------------------------------
+ // One thread interrupted
+ //----------------------------------------------------------------
+ try {
+ Phaser phaser = new Phaser(3);
+ Iterator<Arriver> arrivers = arriverIterator(phaser);
+ int phase = phaser.getPhase();
+ for (int i = 0; i < 4; i++) {
+ check(phaser.getPhase() == phase);
+ Awaiter a1 = awaiter(phaser, 10, SECONDS); a1.start();
+ Arriver a2 = arrivers.next(); a2.start();
+ toTheStartingGate();
+ a1.interrupt();
+ a1.join();
+ phaser.arriveAndAwaitAdvance();
+ a2.join();
+ checkResult(a1, InterruptedException.class);
+ checkResult(a2, null);
+ check(!phaser.isTerminated());
+ equal(phaser.getRegisteredParties(), 3);
+ equal(phaser.getArrivedParties(), 0);
+ phase++;
+ }
+ } catch (Throwable t) { unexpected(t); }
+
+ //----------------------------------------------------------------
+ // Phaser is terminated while threads are waiting
+ //----------------------------------------------------------------
+ try {
+ Phaser phaser = new Phaser(3);
+ Iterator<Awaiter> awaiters = awaiterIterator(phaser);
+ for (int i = 0; i < 4; i++) {
+ Arriver a1 = awaiters.next(); a1.start();
+ Arriver a2 = awaiters.next(); a2.start();
+ toTheStartingGate();
+ while (phaser.getArrivedParties() < 2) Thread.yield();
+ phaser.forceTermination();
+ a1.join();
+ a2.join();
+ check(a1.phase == -1);
+ check(a2.phase == -1);
+ int arrivedParties = phaser.getArrivedParties();
+ checkTerminated(phaser);
+ equal(phaser.getArrivedParties(), arrivedParties);
+ }
+ } catch (Throwable t) { unexpected(t); }
+
+ //----------------------------------------------------------------
+ // Adds new unarrived parties to this phaser
+ //----------------------------------------------------------------
+ try {
+ Phaser phaser = new Phaser(1);
+ Iterator<Arriver> arrivers = arriverIterator(phaser);
+ LinkedList<Arriver> arriverList = new LinkedList<Arriver>();
+ int phase = phaser.getPhase();
+ for (int i = 1; i < 5; i++) {
+ atTheStartingGate = new Phaser(1+(3*i));
+ check(phaser.getPhase() == phase);
+ // register 3 more
+ phaser.register(); phaser.register(); phaser.register();
+ for (int z=0; z<(3*i); z++) {
+ arriverList.add(arrivers.next());
+ }
+ for (Arriver arriver : arriverList)
+ arriver.start();
+
+ toTheStartingGate();
+ phaser.arriveAndAwaitAdvance();
+
+ for (Arriver arriver : arriverList) {
+ arriver.join();
+ checkResult(arriver, null);
+ }
+ equal(phaser.getRegisteredParties(), 1 + (3*i));
+ equal(phaser.getArrivedParties(), 0);
+ arriverList.clear();
+ phase++;
+ }
+ atTheStartingGate = new Phaser(3);
+ } catch (Throwable t) { unexpected(t); }
+
+ //----------------------------------------------------------------
+ // One thread timed out
+ //----------------------------------------------------------------
+ try {
+ Phaser phaser = new Phaser(3);
+ Iterator<Arriver> arrivers = arriverIterator(phaser);
+ for (long timeout : new long[] { 0L, 5L }) {
+ for (int i = 0; i < 2; i++) {
+ Awaiter a1 = awaiter(phaser, timeout, SECONDS); a1.start();
+ Arriver a2 = arrivers.next(); a2.start();
+ toTheStartingGate();
+ a1.join();
+ checkResult(a1, TimeoutException.class);
+ phaser.arrive();
+ a2.join();
+ checkResult(a2, null);
+ check(!phaser.isTerminated());
+ }
+ }
+ } catch (Throwable t) { unexpected(t); }
+
+ //----------------------------------------------------------------
+ // Barrier action completed normally
+ //----------------------------------------------------------------
+ try {
+ final AtomicInteger count = new AtomicInteger(0);
+ final Phaser[] kludge = new Phaser[1];
+ Phaser phaser = new Phaser(3) {
+ @Override
+ protected boolean onAdvance(int phase, int registeredParties) {
+ int countPhase = count.getAndIncrement();
+ equal(countPhase, phase);
+ equal(kludge[0].getPhase(), phase);
+ equal(kludge[0].getRegisteredParties(), registeredParties);
+ if (phase >= 3)
+ return true; // terminate
+
+ return false;
+ }
+ };
+ kludge[0] = phaser;
+ equal(phaser.getRegisteredParties(), 3);
+ Iterator<Awaiter> awaiters = awaiterIterator(phaser);
+ for (int i = 0; i < 4; i++) {
+ Awaiter a1 = awaiters.next(); a1.start();
+ Awaiter a2 = awaiters.next(); a2.start();
+ toTheStartingGate();
+ while (phaser.getArrivedParties() < 2) Thread.yield();
+ phaser.arrive();
+ a1.join();
+ a2.join();
+ checkResult(a1, null);
+ checkResult(a2, null);
+ equal(count.get(), i+1);
+ if (i < 3) {
+ check(!phaser.isTerminated());
+ equal(phaser.getRegisteredParties(), 3);
+ equal(phaser.getArrivedParties(), 0);
+ equal(phaser.getUnarrivedParties(), 3);
+ equal(phaser.getPhase(), count.get());
+ } else
+ checkTerminated(phaser);
+ }
+ } catch (Throwable t) { unexpected(t); }
+
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ static volatile int passed = 0, failed = 0;
+ static void pass() {passed++;}
+ static void fail() {failed++; Thread.dumpStack();}
+ static void fail(String msg) {System.out.println(msg); fail();}
+ static void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ static void check(boolean cond) {if (cond) pass(); else fail();}
+ static void equal(Object x, Object y) {
+ if (x == null ? y == null : x.equals(y)) pass();
+ else fail(x + " not equal to " + y);}
+ public static void main(String[] args) throws Throwable {
+ try {realMain(args);} catch (Throwable t) {unexpected(t);}
+ System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+}
--- a/jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/DelayOverflow.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/ScheduledThreadPoolExecutor/DelayOverflow.java Mon Nov 02 22:23:50 2009 -0800
@@ -21,6 +21,17 @@
*/
/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
* @test
* @bug 6725789
* @summary Check for long overflow in task time comparison.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/forkjoin/Integrate.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,265 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6865571
+ * @summary Numerical Integration using fork/join
+ * @run main Integrate reps=1 forkPolicy=dynamic
+ * @run main Integrate reps=1 forkPolicy=serial
+ * @run main Integrate reps=1 forkPolicy=fork
+ */
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveAction;
+
+/**
+ * Sample program using Gaussian Quadrature for numerical integration.
+ * This version uses a simplified hardwired function. Inspired by a
+ * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html">
+ * Filaments</A> demo program.
+ */
+public final class Integrate {
+
+ static final double errorTolerance = 1.0e-11;
+ /** for time conversion */
+ static final long NPS = (1000L * 1000 * 1000);
+
+ static final int SERIAL = -1;
+ static final int DYNAMIC = 0;
+ static final int FORK = 1;
+
+ // the function to integrate
+ static double computeFunction(double x) {
+ return (x * x + 1.0) * x;
+ }
+
+ static final double start = 0.0;
+ static final double end = 1536.0;
+ /*
+ * The number of recursive calls for
+ * integrate from start to end.
+ * (Empirically determined)
+ */
+ static final int calls = 263479047;
+
+ static String keywordValue(String[] args, String keyword) {
+ for (String arg : args)
+ if (arg.startsWith(keyword))
+ return arg.substring(keyword.length() + 1);
+ return null;
+ }
+
+ static int intArg(String[] args, String keyword, int defaultValue) {
+ String val = keywordValue(args, keyword);
+ return (val == null) ? defaultValue : Integer.parseInt(val);
+ }
+
+ static int policyArg(String[] args, String keyword, int defaultPolicy) {
+ String val = keywordValue(args, keyword);
+ if (val == null) return defaultPolicy;
+ if (val.equals("dynamic")) return DYNAMIC;
+ if (val.equals("serial")) return SERIAL;
+ if (val.equals("fork")) return FORK;
+ throw new Error();
+ }
+
+ /**
+ * Usage: Integrate [procs=N] [reps=N] forkPolicy=serial|dynamic|fork
+ */
+ public static void main(String[] args) throws Exception {
+ final int procs = intArg(args, "procs",
+ Runtime.getRuntime().availableProcessors());
+ final int forkPolicy = policyArg(args, "forkPolicy", DYNAMIC);
+
+ ForkJoinPool g = new ForkJoinPool(procs);
+ System.out.println("Integrating from " + start + " to " + end +
+ " forkPolicy = " + forkPolicy);
+ long lastTime = System.nanoTime();
+
+ for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
+ double a;
+ if (forkPolicy == SERIAL)
+ a = SQuad.computeArea(g, start, end);
+ else if (forkPolicy == FORK)
+ a = FQuad.computeArea(g, start, end);
+ else
+ a = DQuad.computeArea(g, start, end);
+ long now = System.nanoTime();
+ double s = (double) (now - lastTime) / NPS;
+ lastTime = now;
+ System.out.printf("Calls/sec: %12d", (long) (calls / s));
+ System.out.printf(" Time: %7.3f", s);
+ System.out.printf(" Area: %12.1f", a);
+ System.out.println();
+ }
+ System.out.println(g);
+ g.shutdown();
+ }
+
+
+ // Sequential version
+ static final class SQuad extends RecursiveAction {
+ static double computeArea(ForkJoinPool pool, double l, double r) {
+ SQuad q = new SQuad(l, r, 0);
+ pool.invoke(q);
+ return q.area;
+ }
+
+ final double left; // lower bound
+ final double right; // upper bound
+ double area;
+
+ SQuad(double l, double r, double a) {
+ this.left = l; this.right = r; this.area = a;
+ }
+
+ public final void compute() {
+ double l = left;
+ double r = right;
+ area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
+ }
+
+ static final double recEval(double l, double r, double fl,
+ double fr, double a) {
+ double h = (r - l) * 0.5;
+ double c = l + h;
+ double fc = (c * c + 1.0) * c;
+ double hh = h * 0.5;
+ double al = (fl + fc) * hh;
+ double ar = (fr + fc) * hh;
+ double alr = al + ar;
+ if (Math.abs(alr - a) <= errorTolerance)
+ return alr;
+ else
+ return recEval(c, r, fc, fr, ar) + recEval(l, c, fl, fc, al);
+ }
+
+ }
+
+ //....................................
+
+ // ForkJoin version
+ static final class FQuad extends RecursiveAction {
+ static double computeArea(ForkJoinPool pool, double l, double r) {
+ FQuad q = new FQuad(l, r, 0);
+ pool.invoke(q);
+ return q.area;
+ }
+
+ final double left; // lower bound
+ final double right; // upper bound
+ double area;
+
+ FQuad(double l, double r, double a) {
+ this.left = l; this.right = r; this.area = a;
+ }
+
+ public final void compute() {
+ double l = left;
+ double r = right;
+ area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
+ }
+
+ static final double recEval(double l, double r, double fl,
+ double fr, double a) {
+ double h = (r - l) * 0.5;
+ double c = l + h;
+ double fc = (c * c + 1.0) * c;
+ double hh = h * 0.5;
+ double al = (fl + fc) * hh;
+ double ar = (fr + fc) * hh;
+ double alr = al + ar;
+ if (Math.abs(alr - a) <= errorTolerance)
+ return alr;
+ FQuad q = new FQuad(l, c, al);
+ q.fork();
+ ar = recEval(c, r, fc, fr, ar);
+ if (!q.tryUnfork()) {
+ q.quietlyHelpJoin();
+ return ar + q.area;
+ }
+ return ar + recEval(l, c, fl, fc, al);
+ }
+
+ }
+
+ // ...........................
+
+ // Version using on-demand Fork
+ static final class DQuad extends RecursiveAction {
+ static double computeArea(ForkJoinPool pool, double l, double r) {
+ DQuad q = new DQuad(l, r, 0);
+ pool.invoke(q);
+ return q.area;
+ }
+
+ final double left; // lower bound
+ final double right; // upper bound
+ double area;
+
+ DQuad(double l, double r, double a) {
+ this.left = l; this.right = r; this.area = a;
+ }
+
+ public final void compute() {
+ double l = left;
+ double r = right;
+ area = recEval(l, r, (l * l + 1.0) * l, (r * r + 1.0) * r, area);
+ }
+
+ static final double recEval(double l, double r, double fl,
+ double fr, double a) {
+ double h = (r - l) * 0.5;
+ double c = l + h;
+ double fc = (c * c + 1.0) * c;
+ double hh = h * 0.5;
+ double al = (fl + fc) * hh;
+ double ar = (fr + fc) * hh;
+ double alr = al + ar;
+ if (Math.abs(alr - a) <= errorTolerance)
+ return alr;
+ DQuad q = null;
+ if (getSurplusQueuedTaskCount() <= 3)
+ (q = new DQuad(l, c, al)).fork();
+ ar = recEval(c, r, fc, fr, ar);
+ if (q != null && !q.tryUnfork()) {
+ q.quietlyHelpJoin();
+ return ar + q.area;
+ }
+ return ar + recEval(l, c, fl, fc, al);
+ }
+
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/forkjoin/NQueensCS.java Mon Nov 02 22:23:50 2009 -0800
@@ -0,0 +1,174 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6865571
+ * @summary Solve NQueens using fork/join
+ * @run main NQueensCS maxBoardSize=11 reps=1
+ * @run main NQueensCS maxBoardSize=11 reps=1 procs=8
+ */
+
+import java.util.Arrays;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveAction;
+
+public class NQueensCS extends RecursiveAction {
+
+ static long lastStealCount;
+ static int boardSize;
+
+ static final int[] expectedSolutions = new int[] {
+ 0, 1, 0, 0, 2, 10, 4, 40, 92, 352, 724, 2680, 14200,
+ 73712, 365596, 2279184, 14772512, 95815104, 666090624
+ }; // see http://www.durangobill.com/N_Queens.html
+
+ static String keywordValue(String[] args, String keyword) {
+ for (String arg : args)
+ if (arg.startsWith(keyword))
+ return arg.substring(keyword.length() + 1);
+ return null;
+ }
+
+ static int intArg(String[] args, String keyword, int defaultValue) {
+ String val = keywordValue(args, keyword);
+ return (val == null) ? defaultValue : Integer.parseInt(val);
+ }
+
+ /** for time conversion */
+ static final long NPS = (1000L * 1000L * 1000L);
+
+ /**
+ * Usage: NQueensCS [minBoardSize=N] [maxBoardSize=N] [procs=N] [reps=N]
+ */
+ public static void main(String[] args) throws Exception {
+ // Board sizes too small: hard to measure well.
+ // Board sizes too large: take too long to run.
+ final int minBoardSize = intArg(args, "minBoardSize", 8);
+ final int maxBoardSize = intArg(args, "maxBoardSize", 15);
+
+ final int procs = intArg(args, "procs", 0);
+
+ for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
+ ForkJoinPool g = (procs == 0) ?
+ new ForkJoinPool() :
+ new ForkJoinPool(procs);
+ lastStealCount = g.getStealCount();
+ for (int i = minBoardSize; i <= maxBoardSize; i++)
+ test(g, i);
+ System.out.println(g);
+ g.shutdown();
+ }
+ }
+
+ static void test(ForkJoinPool g, int i) throws Exception {
+ boardSize = i;
+ int ps = g.getParallelism();
+ long start = System.nanoTime();
+ NQueensCS task = new NQueensCS(new int[0]);
+ g.invoke(task);
+ int solutions = task.solutions;
+ long time = System.nanoTime() - start;
+ double secs = (double) time / NPS;
+ if (solutions != expectedSolutions[i])
+ throw new Error();
+ System.out.printf("NQueensCS %3d", i);
+ System.out.printf(" Time: %7.3f", secs);
+ long sc = g.getStealCount();
+ long ns = sc - lastStealCount;
+ lastStealCount = sc;
+ System.out.printf(" Steals/t: %5d", ns/ps);
+ System.out.println();
+ }
+
+ // Boards are represented as arrays where each cell
+ // holds the column number of the queen in that row
+
+ final int[] sofar;
+ NQueensCS nextSubtask; // to link subtasks
+ int solutions;
+ NQueensCS(int[] a) {
+ this.sofar = a;
+ }
+
+ public final void compute() {
+ NQueensCS subtasks;
+ int bs = boardSize;
+ if (sofar.length >= bs)
+ solutions = 1;
+ else if ((subtasks = explore(sofar, bs)) != null)
+ solutions = processSubtasks(subtasks);
+ }
+
+ private static NQueensCS explore(int[] array, int bs) {
+ int row = array.length;
+ NQueensCS s = null; // subtask list
+ outer:
+ for (int q = 0; q < bs; ++q) {
+ for (int i = 0; i < row; i++) {
+ int p = array[i];
+ if (q == p || q == p - (row - i) || q == p + (row - i))
+ continue outer; // attacked
+ }
+ NQueensCS first = s; // lag forks to ensure 1 kept
+ if (first != null)
+ first.fork();
+ int[] next = Arrays.copyOf(array, row+1);
+ next[row] = q;
+ NQueensCS subtask = new NQueensCS(next);
+ subtask.nextSubtask = first;
+ s = subtask;
+ }
+ return s;
+ }
+
+ private static int processSubtasks(NQueensCS s) {
+ // Always run first the task held instead of forked
+ s.compute();
+ int ns = s.solutions;
+ s = s.nextSubtask;
+ // Then the unstolen ones
+ while (s != null && s.tryUnfork()) {
+ s.compute();
+ ns += s.solutions;
+ s = s.nextSubtask;
+ }
+ // Then wait for the stolen ones
+ while (s != null) {
+ s.join();
+ ns += s.solutions;
+ s = s.nextSubtask;
+ }
+ return ns;
+ }
+}
--- a/jdk/test/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java Mon Nov 02 22:23:50 2009 -0800
@@ -115,7 +115,7 @@
finally {
lock.unlock();
}
- if (completed != 2)
+ if (c != 2)
throw new Error("Completed != 2");
int r = result;
if (r == 0) // avoid overoptimization
--- a/jdk/test/java/util/concurrent/locks/ReentrantReadWriteLock/RWMap.java Mon Nov 02 00:06:21 2009 -0800
+++ b/jdk/test/java/util/concurrent/locks/ReentrantReadWriteLock/RWMap.java Mon Nov 02 22:23:50 2009 -0800
@@ -30,6 +30,7 @@
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
+
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;