--- a/jdk/src/share/classes/java/util/concurrent/FutureTask.java Sat Jan 28 20:41:27 2012 -0800
+++ b/jdk/src/share/classes/java/util/concurrent/FutureTask.java Mon Jan 30 11:44:45 2012 +0000
@@ -34,51 +34,111 @@
*/
package java.util.concurrent;
-import java.util.concurrent.locks.*;
+import java.util.concurrent.locks.LockSupport;
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
- * retrieved when the computation has completed; the <tt>get</tt>
- * method will block if the computation has not yet completed. Once
+ * retrieved when the computation has completed; the {@code get}
+ * methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
- * or cancelled.
+ * or cancelled (unless the computation is invoked using
+ * {@link #runAndReset}).
*
- * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
- * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
- * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
- * submitted to an {@link Executor} for execution.
+ * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
+ * {@link Runnable} object. Because {@code FutureTask} implements
+ * {@code Runnable}, a {@code FutureTask} can be submitted to an
+ * {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
- * <tt>protected</tt> functionality that may be useful when creating
+ * {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
- * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
+ * @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V> {
- /** Synchronization control for FutureTask */
- private final Sync sync;
+ /*
+ * Revision notes: This differs from previous versions of this
+ * class that relied on AbstractQueuedSynchronizer, mainly to
+ * avoid surprising users about retaining interrupt status during
+ * cancellation races. Sync control in the current design relies
+ * on a "state" field updated via CAS to track completion, along
+ * with a simple Treiber stack to hold waiting threads.
+ *
+ * Style note: As usual, we bypass overhead of using
+ * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
+ */
/**
- * Creates a <tt>FutureTask</tt> that will, upon running, execute the
- * given <tt>Callable</tt>.
+ * The run state of this task, initially NEW. The run state
+ * transitions to a terminal state only in methods set,
+ * setException, and cancel. During completion, state may take on
+ * transient values of COMPLETING (while outcome is being set) or
+ * INTERRUPTING (only while interrupting the runner to satisfy a
+ * cancel(true)). Transitions from these intermediate to final
+ * states use cheaper ordered/lazy writes because values are unique
+ * and cannot be further modified.
+ *
+ * Possible state transitions:
+ * NEW -> COMPLETING -> NORMAL
+ * NEW -> COMPLETING -> EXCEPTIONAL
+ * NEW -> CANCELLED
+ * NEW -> INTERRUPTING -> INTERRUPTED
+ */
+ private volatile int state;
+ private static final int NEW = 0;
+ private static final int COMPLETING = 1;
+ private static final int NORMAL = 2;
+ private static final int EXCEPTIONAL = 3;
+ private static final int CANCELLED = 4;
+ private static final int INTERRUPTING = 5;
+ private static final int INTERRUPTED = 6;
+
+ /** The underlying callable; nulled out after running */
+ private Callable<V> callable;
+ /** The result to return or exception to throw from get() */
+ private Object outcome; // non-volatile, protected by state reads/writes
+ /** The thread running the callable; CASed during run() */
+ private volatile Thread runner;
+ /** Treiber stack of waiting threads */
+ private volatile WaitNode waiters;
+
+ /**
+ * Returns result or throws exception for completed task.
+ *
+ * @param s completed state value
+ */
+ @SuppressWarnings("unchecked")
+ private V report(int s) throws ExecutionException {
+ Object x = outcome;
+ if (s == NORMAL)
+ return (V)x;
+ if (s >= CANCELLED)
+ throw new CancellationException();
+ throw new ExecutionException((Throwable)x);
+ }
+
+ /**
+ * Creates a {@code FutureTask} that will, upon running, execute the
+ * given {@code Callable}.
*
* @param callable the callable task
- * @throws NullPointerException if callable is null
+ * @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
- sync = new Sync(callable);
+ this.callable = callable;
+ this.state = NEW; // ensure visibility of callable
}
/**
- * Creates a <tt>FutureTask</tt> that will, upon running, execute the
- * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
+ * Creates a {@code FutureTask} that will, upon running, execute the
+ * given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
@@ -86,29 +146,46 @@
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
- * @throws NullPointerException if runnable is null
+ * @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
- sync = new Sync(Executors.callable(runnable, result));
+ this.callable = Executors.callable(runnable, result);
+ this.state = NEW; // ensure visibility of callable
}
public boolean isCancelled() {
- return sync.innerIsCancelled();
+ return state >= CANCELLED;
}
public boolean isDone() {
- return sync.innerIsDone();
+ return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
- return sync.innerCancel(mayInterruptIfRunning);
+ if (state != NEW)
+ return false;
+ if (mayInterruptIfRunning) {
+ if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
+ return false;
+ Thread t = runner;
+ if (t != null)
+ t.interrupt();
+ UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
+ }
+ else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
+ return false;
+ finishCompletion();
+ return true;
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
- return sync.innerGet();
+ int s = state;
+ if (s <= COMPLETING)
+ s = awaitDone(false, 0L);
+ return report(s);
}
/**
@@ -116,12 +193,18 @@
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return sync.innerGet(unit.toNanos(timeout));
+ if (unit == null)
+ throw new NullPointerException();
+ int s = state;
+ if (s <= COMPLETING &&
+ (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
+ throw new TimeoutException();
+ return report(s);
}
/**
* Protected method invoked when this task transitions to state
- * <tt>isDone</tt> (whether normally or via cancellation). The
+ * {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
@@ -131,230 +214,269 @@
protected void done() { }
/**
- * Sets the result of this Future to the given value unless
+ * Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
- * This method is invoked internally by the <tt>run</tt> method
+ *
+ * <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
+ *
* @param v the value
*/
protected void set(V v) {
- sync.innerSet(v);
+ if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
+ outcome = v;
+ UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
+ finishCompletion();
+ }
}
/**
- * Causes this future to report an <tt>ExecutionException</tt>
- * with the given throwable as its cause, unless this Future has
+ * Causes this future to report an {@link ExecutionException}
+ * with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
- * This method is invoked internally by the <tt>run</tt> method
+ *
+ * <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
+ *
* @param t the cause of failure
*/
protected void setException(Throwable t) {
- sync.innerSetException(t);
+ if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
+ outcome = t;
+ UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
+ finishCompletion();
+ }
}
- // The following (duplicated) doc comment can be removed once
- //
- // 6270645: Javadoc comments should be inherited from most derived
- // superinterface or superclass
- // is fixed.
- /**
- * Sets this Future to the result of its computation
- * unless it has been cancelled.
- */
public void run() {
- sync.innerRun();
+ if (state != NEW ||
+ !UNSAFE.compareAndSwapObject(this, runnerOffset,
+ null, Thread.currentThread()))
+ return;
+ try {
+ Callable<V> c = callable;
+ if (c != null && state == NEW) {
+ V result;
+ boolean ran;
+ try {
+ result = c.call();
+ ran = true;
+ } catch (Throwable ex) {
+ result = null;
+ ran = false;
+ setException(ex);
+ }
+ if (ran)
+ set(result);
+ }
+ } finally {
+ // runner must be non-null until state is settled to
+ // prevent concurrent calls to run()
+ runner = null;
+ // state must be re-read after nulling runner to prevent
+ // leaked interrupts
+ int s = state;
+ if (s >= INTERRUPTING)
+ handlePossibleCancellationInterrupt(s);
+ }
}
/**
* Executes the computation without setting its result, and then
- * resets this Future to initial state, failing to do so if the
+ * resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
+ *
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
- return sync.innerRunAndReset();
+ if (state != NEW ||
+ !UNSAFE.compareAndSwapObject(this, runnerOffset,
+ null, Thread.currentThread()))
+ return false;
+ boolean ran = false;
+ int s = state;
+ try {
+ Callable<V> c = callable;
+ if (c != null && s == NEW) {
+ try {
+ c.call(); // don't set result
+ ran = true;
+ } catch (Throwable ex) {
+ setException(ex);
+ }
+ }
+ } finally {
+ // runner must be non-null until state is settled to
+ // prevent concurrent calls to run()
+ runner = null;
+ // state must be re-read after nulling runner to prevent
+ // leaked interrupts
+ s = state;
+ if (s >= INTERRUPTING)
+ handlePossibleCancellationInterrupt(s);
+ }
+ return ran && s == NEW;
+ }
+
+ /**
+ * Ensures that any interrupt from a possible cancel(true) is only
+ * delivered to a task while in run or runAndReset.
+ */
+ private void handlePossibleCancellationInterrupt(int s) {
+ // It is possible for our interrupter to stall before getting a
+ // chance to interrupt us. Let's spin-wait patiently.
+ if (s == INTERRUPTING)
+ while (state == INTERRUPTING)
+ Thread.yield(); // wait out pending interrupt
+
+ // assert state == INTERRUPTED;
+
+ // We want to clear any interrupt we may have received from
+ // cancel(true). However, it is permissible to use interrupts
+ // as an independent mechanism for a task to communicate with
+ // its caller, and there is no way to clear only the
+ // cancellation interrupt.
+ //
+ // Thread.interrupted();
+ }
+
+ /**
+ * Simple linked list nodes to record waiting threads in a Treiber
+ * stack. See other classes such as Phaser and SynchronousQueue
+ * for more detailed explanation.
+ */
+ static final class WaitNode {
+ volatile Thread thread;
+ volatile WaitNode next;
+ WaitNode() { thread = Thread.currentThread(); }
}
/**
- * Synchronization control for FutureTask. Note that this must be
- * a non-static inner class in order to invoke the protected
- * <tt>done</tt> method. For clarity, all inner class support
- * methods are same as outer, prefixed with "inner".
- *
- * Uses AQS sync state to represent run status
+ * Removes and signals all waiting threads, invokes done(), and
+ * nulls out callable.
*/
- private final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = -7828117401763700385L;
-
- /** State value representing that task is ready to run */
- private static final int READY = 0;
- /** State value representing that task is running */
- private static final int RUNNING = 1;
- /** State value representing that task ran */
- private static final int RAN = 2;
- /** State value representing that task was cancelled */
- private static final int CANCELLED = 4;
-
- /** The underlying callable */
- private final Callable<V> callable;
- /** The result to return from get() */
- private V result;
- /** The exception to throw from get() */
- private Throwable exception;
-
- /**
- * The thread running task. When nulled after set/cancel, this
- * indicates that the results are accessible. Must be
- * volatile, to ensure visibility upon completion.
- */
- private volatile Thread runner;
-
- Sync(Callable<V> callable) {
- this.callable = callable;
- }
-
- private boolean ranOrCancelled(int state) {
- return (state & (RAN | CANCELLED)) != 0;
- }
-
- /**
- * Implements AQS base acquire to succeed if ran or cancelled
- */
- protected int tryAcquireShared(int ignore) {
- return innerIsDone() ? 1 : -1;
- }
-
- /**
- * Implements AQS base release to always signal after setting
- * final done status by nulling runner thread.
- */
- protected boolean tryReleaseShared(int ignore) {
- runner = null;
- return true;
- }
-
- boolean innerIsCancelled() {
- return getState() == CANCELLED;
- }
-
- boolean innerIsDone() {
- return ranOrCancelled(getState()) && runner == null;
- }
-
- V innerGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);
- if (getState() == CANCELLED)
- throw new CancellationException();
- if (exception != null)
- throw new ExecutionException(exception);
- return result;
- }
-
- V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
- if (!tryAcquireSharedNanos(0, nanosTimeout))
- throw new TimeoutException();
- if (getState() == CANCELLED)
- throw new CancellationException();
- if (exception != null)
- throw new ExecutionException(exception);
- return result;
- }
-
- void innerSet(V v) {
- for (;;) {
- int s = getState();
- if (s == RAN)
- return;
- if (s == CANCELLED) {
- // aggressively release to set runner to null,
- // in case we are racing with a cancel request
- // that will try to interrupt runner
- releaseShared(0);
- return;
+ private void finishCompletion() {
+ // assert state > COMPLETING;
+ for (WaitNode q; (q = waiters) != null;) {
+ if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
+ for (;;) {
+ Thread t = q.thread;
+ if (t != null) {
+ q.thread = null;
+ LockSupport.unpark(t);
+ }
+ WaitNode next = q.next;
+ if (next == null)
+ break;
+ q.next = null; // unlink to help gc
+ q = next;
}
- if (compareAndSetState(s, RAN)) {
- result = v;
- releaseShared(0);
- done();
- return;
- }
+ break;
}
}
- void innerSetException(Throwable t) {
- for (;;) {
- int s = getState();
- if (s == RAN)
- return;
- if (s == CANCELLED) {
- // aggressively release to set runner to null,
- // in case we are racing with a cancel request
- // that will try to interrupt runner
- releaseShared(0);
- return;
- }
- if (compareAndSetState(s, RAN)) {
- exception = t;
- releaseShared(0);
- done();
- return;
- }
+ done();
+
+ callable = null; // to reduce footprint
+ }
+
+ /**
+ * Awaits completion or aborts on interrupt or timeout.
+ *
+ * @param timed true if use timed waits
+ * @param nanos time to wait, if timed
+ * @return state upon completion
+ */
+ private int awaitDone(boolean timed, long nanos)
+ throws InterruptedException {
+ final long deadline = timed ? System.nanoTime() + nanos : 0L;
+ WaitNode q = null;
+ boolean queued = false;
+ for (;;) {
+ if (Thread.interrupted()) {
+ removeWaiter(q);
+ throw new InterruptedException();
}
- }
- boolean innerCancel(boolean mayInterruptIfRunning) {
- for (;;) {
- int s = getState();
- if (ranOrCancelled(s))
- return false;
- if (compareAndSetState(s, CANCELLED))
- break;
- }
- if (mayInterruptIfRunning) {
- Thread r = runner;
- if (r != null)
- r.interrupt();
+ int s = state;
+ if (s > COMPLETING) {
+ if (q != null)
+ q.thread = null;
+ return s;
}
- releaseShared(0);
- done();
- return true;
- }
-
- void innerRun() {
- if (!compareAndSetState(READY, RUNNING))
- return;
-
- runner = Thread.currentThread();
- if (getState() == RUNNING) { // recheck after setting thread
- V result;
- try {
- result = callable.call();
- } catch (Throwable ex) {
- setException(ex);
- return;
+ else if (s == COMPLETING) // cannot time out yet
+ Thread.yield();
+ else if (q == null)
+ q = new WaitNode();
+ else if (!queued)
+ queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
+ q.next = waiters, q);
+ else if (timed) {
+ nanos = deadline - System.nanoTime();
+ if (nanos <= 0L) {
+ removeWaiter(q);
+ return state;
}
- set(result);
- } else {
- releaseShared(0); // cancel
+ LockSupport.parkNanos(this, nanos);
}
+ else
+ LockSupport.park(this);
}
+ }
- boolean innerRunAndReset() {
- if (!compareAndSetState(READY, RUNNING))
- return false;
- try {
- runner = Thread.currentThread();
- if (getState() == RUNNING)
- callable.call(); // don't set result
- runner = null;
- return compareAndSetState(RUNNING, READY);
- } catch (Throwable ex) {
- setException(ex);
- return false;
+ /**
+ * Tries to unlink a timed-out or interrupted wait node to avoid
+ * accumulating garbage. Internal nodes are simply unspliced
+ * without CAS since it is harmless if they are traversed anyway
+ * by releasers. To avoid effects of unsplicing from already
+ * removed nodes, the list is retraversed in case of an apparent
+ * race. This is slow when there are a lot of nodes, but we don't
+ * expect lists to be long enough to outweigh higher-overhead
+ * schemes.
+ */
+ private void removeWaiter(WaitNode node) {
+ if (node != null) {
+ node.thread = null;
+ retry:
+ for (;;) { // restart on removeWaiter race
+ for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
+ s = q.next;
+ if (q.thread != null)
+ pred = q;
+ else if (pred != null) {
+ pred.next = s;
+ if (pred.thread == null) // check for race
+ continue retry;
+ }
+ else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
+ q, s))
+ continue retry;
+ }
+ break;
}
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long stateOffset;
+ private static final long runnerOffset;
+ private static final long waitersOffset;
+ static {
+ try {
+ UNSAFE = sun.misc.Unsafe.getUnsafe();
+ Class<?> k = FutureTask.class;
+ stateOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("state"));
+ runnerOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("runner"));
+ waitersOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("waiters"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
}