diff -r 092073c222f1 -r ffbfda5c3886 jdk/src/share/classes/java/util/concurrent/FutureTask.java --- a/jdk/src/share/classes/java/util/concurrent/FutureTask.java Sat Jan 28 20:41:27 2012 -0800 +++ b/jdk/src/share/classes/java/util/concurrent/FutureTask.java Mon Jan 30 11:44:45 2012 +0000 @@ -34,51 +34,111 @@ */ package java.util.concurrent; -import java.util.concurrent.locks.*; +import java.util.concurrent.locks.LockSupport; /** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be - * retrieved when the computation has completed; the get - * method will block if the computation has not yet completed. Once + * retrieved when the computation has completed; the {@code get} + * methods will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted - * or cancelled. + * or cancelled (unless the computation is invoked using + * {@link #runAndReset}). * - *

A FutureTask can be used to wrap a {@link Callable} or - * {@link java.lang.Runnable} object. Because FutureTask - * implements Runnable, a FutureTask can be - * submitted to an {@link Executor} for execution. + *

A {@code FutureTask} can be used to wrap a {@link Callable} or + * {@link Runnable} object. Because {@code FutureTask} implements + * {@code Runnable}, a {@code FutureTask} can be submitted to an + * {@link Executor} for execution. * *

In addition to serving as a standalone class, this class provides - * protected functionality that may be useful when creating + * {@code protected} functionality that may be useful when creating * customized task classes. * * @since 1.5 * @author Doug Lea - * @param The result type returned by this FutureTask's get method + * @param The result type returned by this FutureTask's {@code get} methods */ public class FutureTask implements RunnableFuture { - /** Synchronization control for FutureTask */ - private final Sync sync; + /* + * Revision notes: This differs from previous versions of this + * class that relied on AbstractQueuedSynchronizer, mainly to + * avoid surprising users about retaining interrupt status during + * cancellation races. Sync control in the current design relies + * on a "state" field updated via CAS to track completion, along + * with a simple Treiber stack to hold waiting threads. + * + * Style note: As usual, we bypass overhead of using + * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics. + */ /** - * Creates a FutureTask that will, upon running, execute the - * given Callable. + * The run state of this task, initially NEW. The run state + * transitions to a terminal state only in methods set, + * setException, and cancel. During completion, state may take on + * transient values of COMPLETING (while outcome is being set) or + * INTERRUPTING (only while interrupting the runner to satisfy a + * cancel(true)). Transitions from these intermediate to final + * states use cheaper ordered/lazy writes because values are unique + * and cannot be further modified. + * + * Possible state transitions: + * NEW -> COMPLETING -> NORMAL + * NEW -> COMPLETING -> EXCEPTIONAL + * NEW -> CANCELLED + * NEW -> INTERRUPTING -> INTERRUPTED + */ + private volatile int state; + private static final int NEW = 0; + private static final int COMPLETING = 1; + private static final int NORMAL = 2; + private static final int EXCEPTIONAL = 3; + private static final int CANCELLED = 4; + private static final int INTERRUPTING = 5; + private static final int INTERRUPTED = 6; + + /** The underlying callable; nulled out after running */ + private Callable callable; + /** The result to return or exception to throw from get() */ + private Object outcome; // non-volatile, protected by state reads/writes + /** The thread running the callable; CASed during run() */ + private volatile Thread runner; + /** Treiber stack of waiting threads */ + private volatile WaitNode waiters; + + /** + * Returns result or throws exception for completed task. + * + * @param s completed state value + */ + @SuppressWarnings("unchecked") + private V report(int s) throws ExecutionException { + Object x = outcome; + if (s == NORMAL) + return (V)x; + if (s >= CANCELLED) + throw new CancellationException(); + throw new ExecutionException((Throwable)x); + } + + /** + * Creates a {@code FutureTask} that will, upon running, execute the + * given {@code Callable}. * * @param callable the callable task - * @throws NullPointerException if callable is null + * @throws NullPointerException if the callable is null */ public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); - sync = new Sync(callable); + this.callable = callable; + this.state = NEW; // ensure visibility of callable } /** - * Creates a FutureTask that will, upon running, execute the - * given Runnable, and arrange that get will return the + * Creates a {@code FutureTask} that will, upon running, execute the + * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task @@ -86,29 +146,46 @@ * you don't need a particular result, consider using * constructions of the form: * {@code Future f = new FutureTask(runnable, null)} - * @throws NullPointerException if runnable is null + * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { - sync = new Sync(Executors.callable(runnable, result)); + this.callable = Executors.callable(runnable, result); + this.state = NEW; // ensure visibility of callable } public boolean isCancelled() { - return sync.innerIsCancelled(); + return state >= CANCELLED; } public boolean isDone() { - return sync.innerIsDone(); + return state != NEW; } public boolean cancel(boolean mayInterruptIfRunning) { - return sync.innerCancel(mayInterruptIfRunning); + if (state != NEW) + return false; + if (mayInterruptIfRunning) { + if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) + return false; + Thread t = runner; + if (t != null) + t.interrupt(); + UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state + } + else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) + return false; + finishCompletion(); + return true; } /** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { - return sync.innerGet(); + int s = state; + if (s <= COMPLETING) + s = awaitDone(false, 0L); + return report(s); } /** @@ -116,12 +193,18 @@ */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return sync.innerGet(unit.toNanos(timeout)); + if (unit == null) + throw new NullPointerException(); + int s = state; + if (s <= COMPLETING && + (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) + throw new TimeoutException(); + return report(s); } /** * Protected method invoked when this task transitions to state - * isDone (whether normally or via cancellation). The + * {@code isDone} (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the @@ -131,230 +214,269 @@ protected void done() { } /** - * Sets the result of this Future to the given value unless + * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. - * This method is invoked internally by the run method + * + *

This method is invoked internally by the {@link #run} method * upon successful completion of the computation. + * * @param v the value */ protected void set(V v) { - sync.innerSet(v); + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + outcome = v; + UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state + finishCompletion(); + } } /** - * Causes this future to report an ExecutionException - * with the given throwable as its cause, unless this Future has + * Causes this future to report an {@link ExecutionException} + * with the given throwable as its cause, unless this future has * already been set or has been cancelled. - * This method is invoked internally by the run method + * + *

This method is invoked internally by the {@link #run} method * upon failure of the computation. + * * @param t the cause of failure */ protected void setException(Throwable t) { - sync.innerSetException(t); + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + outcome = t; + UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state + finishCompletion(); + } } - // The following (duplicated) doc comment can be removed once - // - // 6270645: Javadoc comments should be inherited from most derived - // superinterface or superclass - // is fixed. - /** - * Sets this Future to the result of its computation - * unless it has been cancelled. - */ public void run() { - sync.innerRun(); + if (state != NEW || + !UNSAFE.compareAndSwapObject(this, runnerOffset, + null, Thread.currentThread())) + return; + try { + Callable c = callable; + if (c != null && state == NEW) { + V result; + boolean ran; + try { + result = c.call(); + ran = true; + } catch (Throwable ex) { + result = null; + ran = false; + setException(ex); + } + if (ran) + set(result); + } + } finally { + // runner must be non-null until state is settled to + // prevent concurrent calls to run() + runner = null; + // state must be re-read after nulling runner to prevent + // leaked interrupts + int s = state; + if (s >= INTERRUPTING) + handlePossibleCancellationInterrupt(s); + } } /** * Executes the computation without setting its result, and then - * resets this Future to initial state, failing to do so if the + * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. + * * @return true if successfully run and reset */ protected boolean runAndReset() { - return sync.innerRunAndReset(); + if (state != NEW || + !UNSAFE.compareAndSwapObject(this, runnerOffset, + null, Thread.currentThread())) + return false; + boolean ran = false; + int s = state; + try { + Callable c = callable; + if (c != null && s == NEW) { + try { + c.call(); // don't set result + ran = true; + } catch (Throwable ex) { + setException(ex); + } + } + } finally { + // runner must be non-null until state is settled to + // prevent concurrent calls to run() + runner = null; + // state must be re-read after nulling runner to prevent + // leaked interrupts + s = state; + if (s >= INTERRUPTING) + handlePossibleCancellationInterrupt(s); + } + return ran && s == NEW; + } + + /** + * Ensures that any interrupt from a possible cancel(true) is only + * delivered to a task while in run or runAndReset. + */ + private void handlePossibleCancellationInterrupt(int s) { + // It is possible for our interrupter to stall before getting a + // chance to interrupt us. Let's spin-wait patiently. + if (s == INTERRUPTING) + while (state == INTERRUPTING) + Thread.yield(); // wait out pending interrupt + + // assert state == INTERRUPTED; + + // We want to clear any interrupt we may have received from + // cancel(true). However, it is permissible to use interrupts + // as an independent mechanism for a task to communicate with + // its caller, and there is no way to clear only the + // cancellation interrupt. + // + // Thread.interrupted(); + } + + /** + * Simple linked list nodes to record waiting threads in a Treiber + * stack. See other classes such as Phaser and SynchronousQueue + * for more detailed explanation. + */ + static final class WaitNode { + volatile Thread thread; + volatile WaitNode next; + WaitNode() { thread = Thread.currentThread(); } } /** - * Synchronization control for FutureTask. Note that this must be - * a non-static inner class in order to invoke the protected - * done method. For clarity, all inner class support - * methods are same as outer, prefixed with "inner". - * - * Uses AQS sync state to represent run status + * Removes and signals all waiting threads, invokes done(), and + * nulls out callable. */ - private final class Sync extends AbstractQueuedSynchronizer { - private static final long serialVersionUID = -7828117401763700385L; - - /** State value representing that task is ready to run */ - private static final int READY = 0; - /** State value representing that task is running */ - private static final int RUNNING = 1; - /** State value representing that task ran */ - private static final int RAN = 2; - /** State value representing that task was cancelled */ - private static final int CANCELLED = 4; - - /** The underlying callable */ - private final Callable callable; - /** The result to return from get() */ - private V result; - /** The exception to throw from get() */ - private Throwable exception; - - /** - * The thread running task. When nulled after set/cancel, this - * indicates that the results are accessible. Must be - * volatile, to ensure visibility upon completion. - */ - private volatile Thread runner; - - Sync(Callable callable) { - this.callable = callable; - } - - private boolean ranOrCancelled(int state) { - return (state & (RAN | CANCELLED)) != 0; - } - - /** - * Implements AQS base acquire to succeed if ran or cancelled - */ - protected int tryAcquireShared(int ignore) { - return innerIsDone() ? 1 : -1; - } - - /** - * Implements AQS base release to always signal after setting - * final done status by nulling runner thread. - */ - protected boolean tryReleaseShared(int ignore) { - runner = null; - return true; - } - - boolean innerIsCancelled() { - return getState() == CANCELLED; - } - - boolean innerIsDone() { - return ranOrCancelled(getState()) && runner == null; - } - - V innerGet() throws InterruptedException, ExecutionException { - acquireSharedInterruptibly(0); - if (getState() == CANCELLED) - throw new CancellationException(); - if (exception != null) - throw new ExecutionException(exception); - return result; - } - - V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { - if (!tryAcquireSharedNanos(0, nanosTimeout)) - throw new TimeoutException(); - if (getState() == CANCELLED) - throw new CancellationException(); - if (exception != null) - throw new ExecutionException(exception); - return result; - } - - void innerSet(V v) { - for (;;) { - int s = getState(); - if (s == RAN) - return; - if (s == CANCELLED) { - // aggressively release to set runner to null, - // in case we are racing with a cancel request - // that will try to interrupt runner - releaseShared(0); - return; + private void finishCompletion() { + // assert state > COMPLETING; + for (WaitNode q; (q = waiters) != null;) { + if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { + for (;;) { + Thread t = q.thread; + if (t != null) { + q.thread = null; + LockSupport.unpark(t); + } + WaitNode next = q.next; + if (next == null) + break; + q.next = null; // unlink to help gc + q = next; } - if (compareAndSetState(s, RAN)) { - result = v; - releaseShared(0); - done(); - return; - } + break; } } - void innerSetException(Throwable t) { - for (;;) { - int s = getState(); - if (s == RAN) - return; - if (s == CANCELLED) { - // aggressively release to set runner to null, - // in case we are racing with a cancel request - // that will try to interrupt runner - releaseShared(0); - return; - } - if (compareAndSetState(s, RAN)) { - exception = t; - releaseShared(0); - done(); - return; - } + done(); + + callable = null; // to reduce footprint + } + + /** + * Awaits completion or aborts on interrupt or timeout. + * + * @param timed true if use timed waits + * @param nanos time to wait, if timed + * @return state upon completion + */ + private int awaitDone(boolean timed, long nanos) + throws InterruptedException { + final long deadline = timed ? System.nanoTime() + nanos : 0L; + WaitNode q = null; + boolean queued = false; + for (;;) { + if (Thread.interrupted()) { + removeWaiter(q); + throw new InterruptedException(); } - } - boolean innerCancel(boolean mayInterruptIfRunning) { - for (;;) { - int s = getState(); - if (ranOrCancelled(s)) - return false; - if (compareAndSetState(s, CANCELLED)) - break; - } - if (mayInterruptIfRunning) { - Thread r = runner; - if (r != null) - r.interrupt(); + int s = state; + if (s > COMPLETING) { + if (q != null) + q.thread = null; + return s; } - releaseShared(0); - done(); - return true; - } - - void innerRun() { - if (!compareAndSetState(READY, RUNNING)) - return; - - runner = Thread.currentThread(); - if (getState() == RUNNING) { // recheck after setting thread - V result; - try { - result = callable.call(); - } catch (Throwable ex) { - setException(ex); - return; + else if (s == COMPLETING) // cannot time out yet + Thread.yield(); + else if (q == null) + q = new WaitNode(); + else if (!queued) + queued = UNSAFE.compareAndSwapObject(this, waitersOffset, + q.next = waiters, q); + else if (timed) { + nanos = deadline - System.nanoTime(); + if (nanos <= 0L) { + removeWaiter(q); + return state; } - set(result); - } else { - releaseShared(0); // cancel + LockSupport.parkNanos(this, nanos); } + else + LockSupport.park(this); } + } - boolean innerRunAndReset() { - if (!compareAndSetState(READY, RUNNING)) - return false; - try { - runner = Thread.currentThread(); - if (getState() == RUNNING) - callable.call(); // don't set result - runner = null; - return compareAndSetState(RUNNING, READY); - } catch (Throwable ex) { - setException(ex); - return false; + /** + * Tries to unlink a timed-out or interrupted wait node to avoid + * accumulating garbage. Internal nodes are simply unspliced + * without CAS since it is harmless if they are traversed anyway + * by releasers. To avoid effects of unsplicing from already + * removed nodes, the list is retraversed in case of an apparent + * race. This is slow when there are a lot of nodes, but we don't + * expect lists to be long enough to outweigh higher-overhead + * schemes. + */ + private void removeWaiter(WaitNode node) { + if (node != null) { + node.thread = null; + retry: + for (;;) { // restart on removeWaiter race + for (WaitNode pred = null, q = waiters, s; q != null; q = s) { + s = q.next; + if (q.thread != null) + pred = q; + else if (pred != null) { + pred.next = s; + if (pred.thread == null) // check for race + continue retry; + } + else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, + q, s)) + continue retry; + } + break; } } } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long stateOffset; + private static final long runnerOffset; + private static final long waitersOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = FutureTask.class; + stateOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("state")); + runnerOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("runner")); + waitersOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("waiters")); + } catch (Exception e) { + throw new Error(e); + } + } + }