--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Wed Oct 14 00:08:42 2015 +0200
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Oct 13 16:04:56 2015 -0700
@@ -34,23 +34,13 @@
*/
package java.util.concurrent;
-import java.util.function.Supplier;
-import java.util.function.Consumer;
+
+import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
-import java.util.function.Function;
import java.util.function.BiFunction;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
* A {@link Future} that may be explicitly completed (setting its
@@ -71,19 +61,32 @@
* <li>Actions supplied for dependent completions of
* <em>non-async</em> methods may be performed by the thread that
* completes the current CompletableFuture, or by any other caller of
- * a completion method.</li>
+ * a completion method.
*
* <li>All <em>async</em> methods without an explicit Executor
* argument are performed using the {@link ForkJoinPool#commonPool()}
* (unless it does not support a parallelism level of at least two, in
- * which case, a new Thread is created to run each task). To simplify
- * monitoring, debugging, and tracking, all generated asynchronous
- * tasks are instances of the marker interface {@link
- * AsynchronousCompletionTask}. </li>
+ * which case, a new Thread is created to run each task). This may be
+ * overridden for non-static methods in subclasses by defining method
+ * {@link #defaultExecutor()}. To simplify monitoring, debugging,
+ * and tracking, all generated asynchronous tasks are instances of the
+ * marker interface {@link AsynchronousCompletionTask}. Operations
+ * with time-delays can use adapter methods defined in this class, for
+ * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
+ * timeUnit))}. To support methods with delays and timeouts, this
+ * class maintains at most one daemon thread for triggering and
+ * cancelling actions, not for running them.
*
* <li>All CompletionStage methods are implemented independently of
* other public methods, so the behavior of one method is not impacted
- * by overrides of others in subclasses. </li> </ul>
+ * by overrides of others in subclasses.
+ *
+ * <li>All CompletionStage methods return CompletableFutures. To
+ * restrict usages to only those methods defined in interface
+ * CompletionStage, use method {@link #minimalCompletionStage}. Or to
+ * ensure only that clients do not themselves modify a future, use
+ * method {@link #copy}.
+ * </ul>
*
* <p>CompletableFuture also implements {@link Future} with the following
* policies: <ul>
@@ -94,7 +97,7 @@
* completion. Method {@link #cancel cancel} has the same effect as
* {@code completeExceptionally(new CancellationException())}. Method
* {@link #isCompletedExceptionally} can be used to determine if a
- * CompletableFuture completed in any exceptional fashion.</li>
+ * CompletableFuture completed in any exceptional fashion.
*
* <li>In case of exceptional completion with a CompletionException,
* methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
@@ -102,10 +105,38 @@
* corresponding CompletionException. To simplify usage in most
* contexts, this class also defines methods {@link #join()} and
* {@link #getNow} that instead throw the CompletionException directly
- * in these cases.</li> </ul>
+ * in these cases.
+ * </ul>
+ *
+ * <p>Arguments used to pass a completion result (that is, for
+ * parameters of type {@code T}) for methods accepting them may be
+ * null, but passing a null value for any other parameter will result
+ * in a {@link NullPointerException} being thrown.
+ *
+ * <p>Subclasses of this class should normally override the "virtual
+ * constructor" method {@link #newIncompleteFuture}, which establishes
+ * the concrete type returned by CompletionStage methods. For example,
+ * here is a class that substitutes a different default Executor and
+ * disables the {@code obtrude} methods:
+ *
+ * <pre> {@code
+ * class MyCompletableFuture<T> extends CompletableFuture<T> {
+ * static final Executor myExecutor = ...;
+ * public MyCompletableFuture() { }
+ * public <U> CompletableFuture<U> newIncompleteFuture() {
+ * return new MyCompletableFuture<U>(); }
+ * public Executor defaultExecutor() {
+ * return myExecutor; }
+ * public void obtrudeValue(T value) {
+ * throw new UnsupportedOperationException(); }
+ * public void obtrudeException(Throwable ex) {
+ * throw new UnsupportedOperationException(); }
+ * }}</pre>
*
* @author Doug Lea
* @since 1.8
+ * @param <T> The result type returned by this future's {@code join}
+ * and {@code get} methods
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
@@ -150,9 +181,7 @@
* fields for source(s), actions, and dependent. They are
* boringly similar, differing from others only with respect to
* underlying functional forms. We do this so that users don't
- * encounter layers of adaptors in common usages. We also
- * include "Relay" classes/methods that don't correspond to user
- * methods; they copy results from one stage to another.
+ * encounter layers of adapters in common usages.
*
* * Boolean CompletableFuture method x(...) (for example
* uniApply) takes all of the arguments needed to check that an
@@ -219,18 +248,18 @@
volatile Completion stack; // Top of Treiber stack of dependent actions
final boolean internalComplete(Object r) { // CAS from null to r
- return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
+ return U.compareAndSwapObject(this, RESULT, null, r);
}
final boolean casStack(Completion cmp, Completion val) {
- return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
+ return U.compareAndSwapObject(this, STACK, cmp, val);
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
- return UNSAFE.compareAndSwapObject(this, STACK, h, c);
+ return U.compareAndSwapObject(this, STACK, h, c);
}
/** Unconditionally pushes c onto stack, retrying if necessary. */
@@ -250,8 +279,8 @@
/** Completes with the null value, unless already completed. */
final boolean completeNull() {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- NIL);
+ return U.compareAndSwapObject(this, RESULT, null,
+ NIL);
}
/** Returns the encoding of the given non-exceptional value. */
@@ -261,8 +290,8 @@
/** Completes with a non-exceptional result, unless already completed. */
final boolean completeValue(T t) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- (t == null) ? NIL : t);
+ return U.compareAndSwapObject(this, RESULT, null,
+ (t == null) ? NIL : t);
}
/**
@@ -276,8 +305,8 @@
/** Completes with an exceptional result, unless already completed. */
final boolean completeThrowable(Throwable x) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeThrowable(x));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeThrowable(x));
}
/**
@@ -304,8 +333,8 @@
* existing CompletionException.
*/
final boolean completeThrowable(Throwable x, Object r) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeThrowable(x, r));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeThrowable(x, r));
}
/**
@@ -334,8 +363,8 @@
* If exceptional, r is first coerced to a CompletionException.
*/
final boolean completeRelay(Object r) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeRelay(r));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeRelay(r));
}
/**
@@ -390,14 +419,14 @@
public static interface AsynchronousCompletionTask {
}
- private static final boolean useCommonPool =
+ private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
- private static final Executor asyncPool = useCommonPool ?
+ private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
@@ -407,11 +436,11 @@
/**
* Null-checks user executor argument, and translates uses of
- * commonPool to asyncPool in case parallelism disabled.
+ * commonPool to ASYNC_POOL in case parallelism disabled.
*/
static Executor screenExecutor(Executor e) {
- if (!useCommonPool && e == ForkJoinPool.commonPool())
- return asyncPool;
+ if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
+ return ASYNC_POOL;
if (e == null) throw new NullPointerException();
return e;
}
@@ -421,6 +450,12 @@
static final int ASYNC = 1;
static final int NESTED = -1;
+ /**
+ * Spins before blocking in waitingGet
+ */
+ static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
+ 1 << 8 : 0);
+
/* ------------- Base Completion classes and operations -------------- */
@SuppressWarnings("serial")
@@ -440,13 +475,13 @@
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
- public final boolean exec() { tryFire(ASYNC); return true; }
+ public final boolean exec() { tryFire(ASYNC); return false; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
static void lazySetNext(Completion c, Completion next) {
- UNSAFE.putOrderedObject(c, NEXT, next);
+ U.putOrderedObject(c, NEXT, next);
}
/**
@@ -610,7 +645,7 @@
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<V> d = new CompletableFuture<V>();
+ CompletableFuture<V> d = newIncompleteFuture();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
@@ -665,7 +700,7 @@
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
@@ -713,7 +748,7 @@
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.uniRun(this, f, null)) {
UniRun<T> c = new UniRun<T>(e, d, this, f);
push(c);
@@ -774,7 +809,7 @@
private CompletableFuture<T> uniWhenCompleteStage(
Executor e, BiConsumer<? super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<T> d = new CompletableFuture<T>();
+ CompletableFuture<T> d = newIncompleteFuture();
if (e != null || !d.uniWhenComplete(this, f, null)) {
UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
push(c);
@@ -830,7 +865,7 @@
private <V> CompletableFuture<V> uniHandleStage(
Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<V> d = new CompletableFuture<V>();
+ CompletableFuture<V> d = newIncompleteFuture();
if (e != null || !d.uniHandle(this, f, null)) {
UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
push(c);
@@ -880,7 +915,7 @@
private CompletableFuture<T> uniExceptionallyStage(
Function<Throwable, ? extends T> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture<T> d = new CompletableFuture<T>();
+ CompletableFuture<T> d = newIncompleteFuture();
if (!d.uniExceptionally(this, f, null)) {
UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
push(c);
@@ -912,6 +947,30 @@
return true;
}
+ private CompletableFuture<T> uniCopyStage() {
+ Object r;
+ CompletableFuture<T> d = newIncompleteFuture();
+ if ((r = result) != null)
+ d.completeRelay(r);
+ else {
+ UniRelay<T> c = new UniRelay<T>(d, this);
+ push(c);
+ c.tryFire(SYNC);
+ }
+ return d;
+ }
+
+ private MinimalStage<T> uniAsMinimalStage() {
+ Object r;
+ if ((r = result) != null)
+ return new MinimalStage<T>(encodeRelay(r));
+ MinimalStage<T> d = new MinimalStage<T>();
+ UniRelay<T> c = new UniRelay<T>(d, this);
+ push(c);
+ c.tryFire(SYNC);
+ return d;
+ }
+
@SuppressWarnings("serial")
static final class UniCompose<T,V> extends UniCompletion<T,V> {
Function<? super T, ? extends CompletionStage<V>> fn;
@@ -967,31 +1026,32 @@
private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
- Object r; Throwable x;
+ Object r, s; Throwable x;
+ CompletableFuture<V> d = newIncompleteFuture();
if (e == null && (r = result) != null) {
- // try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
- return new CompletableFuture<V>(encodeThrowable(x, r));
+ d.result = encodeThrowable(x, r);
+ return d;
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
- Object s = g.result;
- if (s != null)
- return new CompletableFuture<V>(encodeRelay(s));
- CompletableFuture<V> d = new CompletableFuture<V>();
- UniRelay<V> copy = new UniRelay<V>(d, g);
- g.push(copy);
- copy.tryFire(SYNC);
+ if ((s = g.result) != null)
+ d.completeRelay(s);
+ else {
+ UniRelay<V> c = new UniRelay<V>(d, g);
+ g.push(c);
+ c.tryFire(SYNC);
+ }
return d;
} catch (Throwable ex) {
- return new CompletableFuture<V>(encodeThrowable(ex));
+ d.result = encodeThrowable(ex);
+ return d;
}
}
- CompletableFuture<V> d = new CompletableFuture<V>();
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
@@ -1116,7 +1176,7 @@
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<V> d = new CompletableFuture<V>();
+ CompletableFuture<V> d = newIncompleteFuture();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c);
@@ -1188,7 +1248,7 @@
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.biAccept(this, b, f, null)) {
BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
bipush(b, c);
@@ -1247,7 +1307,7 @@
CompletableFuture<?> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.biRun(this, b, f, null)) {
BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
bipush(b, c);
@@ -1302,7 +1362,7 @@
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
- andTree(cfs, mid+1, hi))) == null)
+ andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay<?,?> c = new BiRelay<>(d, a, b);
@@ -1388,7 +1448,7 @@
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<V> d = new CompletableFuture<V>();
+ CompletableFuture<V> d = newIncompleteFuture();
if (e != null || !d.orApply(this, b, f, null)) {
OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
orpush(b, c);
@@ -1452,7 +1512,7 @@
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.orAccept(this, b, f, null)) {
OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
orpush(b, c);
@@ -1510,7 +1570,7 @@
CompletableFuture<?> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
+ CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.orRun(this, b, f, null)) {
OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
orpush(b, c);
@@ -1556,7 +1616,7 @@
if ((a = (lo == mid ? cfs[lo] :
orTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
- orTree(cfs, mid+1, hi))) == null)
+ orTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.orRelay(a, b)) {
OrRelay<?,?> c = new OrRelay<>(d, a, b);
@@ -1571,9 +1631,9 @@
@SuppressWarnings("serial")
static final class AsyncSupply<T> extends ForkJoinTask<Void>
- implements Runnable, AsynchronousCompletionTask {
- CompletableFuture<T> dep; Supplier<T> fn;
- AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
+ implements Runnable, AsynchronousCompletionTask {
+ CompletableFuture<T> dep; Supplier<? extends T> fn;
+ AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}
@@ -1582,7 +1642,7 @@
public final boolean exec() { run(); return true; }
public void run() {
- CompletableFuture<T> d; Supplier<T> f;
+ CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
@@ -1607,7 +1667,7 @@
@SuppressWarnings("serial")
static final class AsyncRun extends ForkJoinTask<Void>
- implements Runnable, AsynchronousCompletionTask {
+ implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep; this.fn = fn;
@@ -1651,14 +1711,15 @@
@SuppressWarnings("serial")
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
- long nanos; // wait time if timed
+ long nanos; // remaining wait time if timed
final long deadline; // non-zero if timed
- volatile int interruptControl; // > 0: interruptible, < 0: interrupted
+ final boolean interruptible;
+ boolean interrupted;
volatile Thread thread;
Signaller(boolean interruptible, long nanos, long deadline) {
this.thread = Thread.currentThread();
- this.interruptControl = interruptible ? 1 : 0;
+ this.interruptible = interruptible;
this.nanos = nanos;
this.deadline = deadline;
}
@@ -1671,29 +1732,22 @@
return null;
}
public boolean isReleasable() {
- if (thread == null)
- return true;
- if (Thread.interrupted()) {
- int i = interruptControl;
- interruptControl = -1;
- if (i > 0)
- return true;
- }
- if (deadline != 0L &&
- (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
- thread = null;
- return true;
- }
- return false;
+ if (Thread.interrupted())
+ interrupted = true;
+ return ((interrupted && interruptible) ||
+ (deadline != 0L &&
+ (nanos <= 0L ||
+ (nanos = deadline - System.nanoTime()) <= 0L)) ||
+ thread == null);
}
public boolean block() {
- if (isReleasable())
- return true;
- else if (deadline == 0L)
- LockSupport.park(this);
- else if (nanos > 0L)
- LockSupport.parkNanos(this, nanos);
- return isReleasable();
+ while (!isReleasable()) {
+ if (deadline == 0L)
+ LockSupport.park(this);
+ else
+ LockSupport.parkNanos(this, nanos);
+ }
+ return true;
}
final boolean isLive() { return thread != null; }
}
@@ -1705,13 +1759,10 @@
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
- int spins = -1;
+ int spins = SPINS;
Object r;
while ((r = result) == null) {
- if (spins < 0)
- spins = (Runtime.getRuntime().availableProcessors() > 1) ?
- 1 << 8 : 0; // Use brief spin-wait on multiprocessors
- else if (spins > 0) {
+ if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
@@ -1719,29 +1770,27 @@
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
- else if (interruptible && q.interruptControl < 0) {
- q.thread = null;
- cleanStack();
- return null;
- }
- else if (q.thread != null && result == null) {
+ else {
try {
ForkJoinPool.managedBlock(q);
- } catch (InterruptedException ie) {
- q.interruptControl = -1;
+ } catch (InterruptedException ie) { // currently cannot happen
+ q.interrupted = true;
}
+ if (q.interrupted && interruptible)
+ break;
}
}
if (q != null) {
q.thread = null;
- if (q.interruptControl < 0) {
+ if (q.interrupted) {
if (interruptible)
- r = null; // report interruption
+ cleanStack();
else
Thread.currentThread().interrupt();
}
}
- postComplete();
+ if (r != null)
+ postComplete();
return r;
}
@@ -1752,37 +1801,39 @@
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
- if (nanos <= 0L)
- throw new TimeoutException();
- long d = System.nanoTime() + nanos;
- Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
- boolean queued = false;
- Object r;
- // We intentionally don't spin here (as waitingGet does) because
- // the call to nanoTime() above acts much like a spin.
- while ((r = result) == null) {
- if (!queued)
- queued = tryPushStack(q);
- else if (q.interruptControl < 0 || q.nanos <= 0L) {
- q.thread = null;
- cleanStack();
- if (q.interruptControl < 0)
- return null;
- throw new TimeoutException();
- }
- else if (q.thread != null && result == null) {
- try {
- ForkJoinPool.managedBlock(q);
- } catch (InterruptedException ie) {
- q.interruptControl = -1;
+ if (nanos > 0L) {
+ long d = System.nanoTime() + nanos;
+ long deadline = (d == 0L) ? 1L : d; // avoid 0
+ Signaller q = null;
+ boolean queued = false;
+ Object r;
+ while ((r = result) == null) { // similar to untimed, without spins
+ if (q == null)
+ q = new Signaller(true, nanos, deadline);
+ else if (!queued)
+ queued = tryPushStack(q);
+ else if (q.nanos <= 0L)
+ break;
+ else {
+ try {
+ ForkJoinPool.managedBlock(q);
+ } catch (InterruptedException ie) {
+ q.interrupted = true;
+ }
+ if (q.interrupted)
+ break;
}
}
+ if (q != null)
+ q.thread = null;
+ if (r != null)
+ postComplete();
+ else
+ cleanStack();
+ if (r != null || (q != null && q.interrupted))
+ return r;
}
- if (q.interruptControl < 0)
- r = null;
- q.thread = null;
- postComplete();
- return r;
+ throw new TimeoutException();
}
/* ------------- public methods -------------- */
@@ -1796,7 +1847,7 @@
/**
* Creates a new complete CompletableFuture with given encoded result.
*/
- private CompletableFuture(Object r) {
+ CompletableFuture(Object r) {
this.result = r;
}
@@ -1811,7 +1862,7 @@
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
- return asyncSupplyStage(asyncPool, supplier);
+ return asyncSupplyStage(ASYNC_POOL, supplier);
}
/**
@@ -1840,7 +1891,7 @@
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
- return asyncRunStage(asyncPool, runnable);
+ return asyncRunStage(ASYNC_POOL, runnable);
}
/**
@@ -1985,7 +2036,7 @@
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
- return uniApplyStage(asyncPool, fn);
+ return uniApplyStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
@@ -1998,7 +2049,7 @@
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
- return uniAcceptStage(asyncPool, action);
+ return uniAcceptStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
@@ -2011,7 +2062,7 @@
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
- return uniRunStage(asyncPool, action);
+ return uniRunStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
@@ -2028,7 +2079,7 @@
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
- return biApplyStage(asyncPool, other, fn);
+ return biApplyStage(defaultExecutor(), other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
@@ -2046,7 +2097,7 @@
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
- return biAcceptStage(asyncPool, other, action);
+ return biAcceptStage(defaultExecutor(), other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
@@ -2062,7 +2113,7 @@
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
- return biRunStage(asyncPool, other, action);
+ return biRunStage(defaultExecutor(), other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
@@ -2078,7 +2129,7 @@
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
- return orApplyStage(asyncPool, other, fn);
+ return orApplyStage(defaultExecutor(), other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
@@ -2094,7 +2145,7 @@
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
- return orAcceptStage(asyncPool, other, action);
+ return orAcceptStage(defaultExecutor(), other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
@@ -2110,7 +2161,7 @@
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
- return orRunStage(asyncPool, other, action);
+ return orRunStage(defaultExecutor(), other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
@@ -2126,7 +2177,7 @@
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
- return uniComposeStage(asyncPool, fn);
+ return uniComposeStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
@@ -2142,7 +2193,7 @@
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
- return uniWhenCompleteStage(asyncPool, action);
+ return uniWhenCompleteStage(defaultExecutor(), action);
}
public CompletableFuture<T> whenCompleteAsync(
@@ -2157,7 +2208,7 @@
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
- return uniHandleStage(asyncPool, fn);
+ return uniHandleStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> handleAsync(
@@ -2196,6 +2247,7 @@
return uniExceptionallyStage(fn);
}
+
/* ------------- Arbitrary-arity constructions -------------- */
/**
@@ -2353,10 +2405,12 @@
*/
public String toString() {
Object r = result;
- int count;
+ int count = 0; // avoid call to getNumberOfDependents in case disabled
+ for (Completion p = stack; p != null; p = p.next)
+ ++count;
return super.toString() +
((r == null) ?
- (((count = getNumberOfDependents()) == 0) ?
+ ((count == 0) ?
"[Not completed]" :
"[Not completed, " + count + " dependents]") :
(((r instanceof AltResult) && ((AltResult)r).ex != null) ?
@@ -2364,22 +2418,381 @@
"[Completed normally]"));
}
+ // jdk9 additions
+
+ /**
+ * Returns a new incomplete CompletableFuture of the type to be
+ * returned by a CompletionStage method. Subclasses should
+ * normally override this method to return an instance of the same
+ * class as this CompletableFuture. The default implementation
+ * returns an instance of class CompletableFuture.
+ *
+ * @param <U> the type of the value
+ * @return a new CompletableFuture
+ * @since 1.9
+ */
+ public <U> CompletableFuture<U> newIncompleteFuture() {
+ return new CompletableFuture<U>();
+ }
+
+ /**
+ * Returns the default Executor used for async methods that do not
+ * specify an Executor. This class uses the {@link
+ * ForkJoinPool#commonPool()} if it supports more than one
+ * parallel thread, or else an Executor using one thread per async
+ * task. This method may be overridden in subclasses to return
+ * an Executor that provides at least one independent thread.
+ *
+ * @return the executor
+ * @since 1.9
+ */
+ public Executor defaultExecutor() {
+ return ASYNC_POOL;
+ }
+
+ /**
+ * Returns a new CompletableFuture that is completed normally with
+ * the same value as this CompletableFuture when it completes
+ * normally. If this CompletableFuture completes exceptionally,
+ * then the returned CompletableFuture completes exceptionally
+ * with a CompletionException with this exception as cause. The
+ * behavior is equivalent to {@code thenApply(x -> x)}. This
+ * method may be useful as a form of "defensive copying", to
+ * prevent clients from completing, while still being able to
+ * arrange dependent actions.
+ *
+ * @return the new CompletableFuture
+ * @since 1.9
+ */
+ public CompletableFuture<T> copy() {
+ return uniCopyStage();
+ }
+
+ /**
+ * Returns a new CompletionStage that is completed normally with
+ * the same value as this CompletableFuture when it completes
+ * normally, and cannot be independently completed or otherwise
+ * used in ways not defined by the methods of interface {@link
+ * CompletionStage}. If this CompletableFuture completes
+ * exceptionally, then the returned CompletionStage completes
+ * exceptionally with a CompletionException with this exception as
+ * cause.
+ *
+ * @return the new CompletionStage
+ * @since 1.9
+ */
+ public CompletionStage<T> minimalCompletionStage() {
+ return uniAsMinimalStage();
+ }
+
+ /**
+ * Completes this CompletableFuture with the result of
+ * the given Supplier function invoked from an asynchronous
+ * task using the given executor.
+ *
+ * @param supplier a function returning the value to be used
+ * to complete this CompletableFuture
+ * @param executor the executor to use for asynchronous execution
+ * @return this CompletableFuture
+ * @since 1.9
+ */
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
+ Executor executor) {
+ if (supplier == null || executor == null)
+ throw new NullPointerException();
+ executor.execute(new AsyncSupply<T>(this, supplier));
+ return this;
+ }
+
+ /**
+ * Completes this CompletableFuture with the result of the given
+ * Supplier function invoked from an asynchronous task using the
+ * default executor.
+ *
+ * @param supplier a function returning the value to be used
+ * to complete this CompletableFuture
+ * @return this CompletableFuture
+ * @since 1.9
+ */
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+ return completeAsync(supplier, defaultExecutor());
+ }
+
+ /**
+ * Exceptionally completes this CompletableFuture with
+ * a {@link TimeoutException} if not otherwise completed
+ * before the given timeout.
+ *
+ * @param timeout how long to wait before completing exceptionally
+ * with a TimeoutException, in units of {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return this CompletableFuture
+ * @since 1.9
+ */
+ public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
+ if (unit == null)
+ throw new NullPointerException();
+ if (result == null)
+ whenComplete(new Canceller(Delayer.delay(new Timeout(this),
+ timeout, unit)));
+ return this;
+ }
+
+ /**
+ * Completes this CompletableFuture with the given value if not
+ * otherwise completed before the given timeout.
+ *
+ * @param value the value to use upon timeout
+ * @param timeout how long to wait before completing normally
+ * with the given value, in units of {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return this CompletableFuture
+ * @since 1.9
+ */
+ public CompletableFuture<T> completeOnTimeout(T value, long timeout,
+ TimeUnit unit) {
+ if (unit == null)
+ throw new NullPointerException();
+ if (result == null)
+ whenComplete(new Canceller(Delayer.delay(
+ new DelayedCompleter<T>(this, value),
+ timeout, unit)));
+ return this;
+ }
+
+ /**
+ * Returns a new Executor that submits a task to the given base
+ * executor after the given delay (or no delay if non-positive).
+ * Each delay commences upon invocation of the returned executor's
+ * {@code execute} method.
+ *
+ * @param delay how long to delay, in units of {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code delay} parameter
+ * @param executor the base executor
+ * @return the new delayed executor
+ * @since 1.9
+ */
+ public static Executor delayedExecutor(long delay, TimeUnit unit,
+ Executor executor) {
+ if (unit == null || executor == null)
+ throw new NullPointerException();
+ return new DelayedExecutor(delay, unit, executor);
+ }
+
+ /**
+ * Returns a new Executor that submits a task to the default
+ * executor after the given delay (or no delay if non-positive).
+ * Each delay commences upon invocation of the returned executor's
+ * {@code execute} method.
+ *
+ * @param delay how long to delay, in units of {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code delay} parameter
+ * @return the new delayed executor
+ * @since 1.9
+ */
+ public static Executor delayedExecutor(long delay, TimeUnit unit) {
+ if (unit == null)
+ throw new NullPointerException();
+ return new DelayedExecutor(delay, unit, ASYNC_POOL);
+ }
+
+ /**
+ * Returns a new CompletionStage that is already completed with
+ * the given value and supports only those methods in
+ * interface {@link CompletionStage}.
+ *
+ * @param value the value
+ * @param <U> the type of the value
+ * @return the completed CompletionStage
+ * @since 1.9
+ */
+ public static <U> CompletionStage<U> completedStage(U value) {
+ return new MinimalStage<U>((value == null) ? NIL : value);
+ }
+
+ /**
+ * Returns a new CompletableFuture that is already completed
+ * exceptionally with the given exception.
+ *
+ * @param ex the exception
+ * @param <U> the type of the value
+ * @return the exceptionally completed CompletableFuture
+ * @since 1.9
+ */
+ public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
+ if (ex == null) throw new NullPointerException();
+ return new CompletableFuture<U>(new AltResult(ex));
+ }
+
+ /**
+ * Returns a new CompletionStage that is already completed
+ * exceptionally with the given exception and supports only those
+ * methods in interface {@link CompletionStage}.
+ *
+ * @param ex the exception
+ * @param <U> the type of the value
+ * @return the exceptionally completed CompletionStage
+ * @since 1.9
+ */
+ public static <U> CompletionStage<U> failedStage(Throwable ex) {
+ if (ex == null) throw new NullPointerException();
+ return new MinimalStage<U>(new AltResult(ex));
+ }
+
+ /**
+ * Singleton delay scheduler, used only for starting and
+ * cancelling tasks.
+ */
+ static final class Delayer {
+ static ScheduledFuture<?> delay(Runnable command, long delay,
+ TimeUnit unit) {
+ return delayer.schedule(command, delay, unit);
+ }
+
+ static final class DaemonThreadFactory implements ThreadFactory {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("CompletableFutureDelayScheduler");
+ return t;
+ }
+ }
+
+ static final ScheduledThreadPoolExecutor delayer;
+ static {
+ (delayer = new ScheduledThreadPoolExecutor(
+ 1, new DaemonThreadFactory())).
+ setRemoveOnCancelPolicy(true);
+ }
+ }
+
+ // Little class-ified lambdas to better support monitoring
+
+ static final class DelayedExecutor implements Executor {
+ final long delay;
+ final TimeUnit unit;
+ final Executor executor;
+ DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
+ this.delay = delay; this.unit = unit; this.executor = executor;
+ }
+ public void execute(Runnable r) {
+ Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
+ }
+ }
+
+ /** Action to submit user task */
+ static final class TaskSubmitter implements Runnable {
+ final Executor executor;
+ final Runnable action;
+ TaskSubmitter(Executor executor, Runnable action) {
+ this.executor = executor;
+ this.action = action;
+ }
+ public void run() { executor.execute(action); }
+ }
+
+ /** Action to completeExceptionally on timeout */
+ static final class Timeout implements Runnable {
+ final CompletableFuture<?> f;
+ Timeout(CompletableFuture<?> f) { this.f = f; }
+ public void run() {
+ if (f != null && !f.isDone())
+ f.completeExceptionally(new TimeoutException());
+ }
+ }
+
+ /** Action to complete on timeout */
+ static final class DelayedCompleter<U> implements Runnable {
+ final CompletableFuture<U> f;
+ final U u;
+ DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
+ public void run() {
+ if (f != null)
+ f.complete(u);
+ }
+ }
+
+ /** Action to cancel unneeded timeouts */
+ static final class Canceller implements BiConsumer<Object, Throwable> {
+ final Future<?> f;
+ Canceller(Future<?> f) { this.f = f; }
+ public void accept(Object ignore, Throwable ex) {
+ if (ex == null && f != null && !f.isDone())
+ f.cancel(false);
+ }
+ }
+
+ /**
+ * A subclass that just throws UOE for most non-CompletionStage methods.
+ */
+ static final class MinimalStage<T> extends CompletableFuture<T> {
+ MinimalStage() { }
+ MinimalStage(Object r) { super(r); }
+ @Override public <U> CompletableFuture<U> newIncompleteFuture() {
+ return new MinimalStage<U>(); }
+ @Override public T get() {
+ throw new UnsupportedOperationException(); }
+ @Override public T get(long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException(); }
+ @Override public T getNow(T valueIfAbsent) {
+ throw new UnsupportedOperationException(); }
+ @Override public T join() {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean complete(T value) {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean completeExceptionally(Throwable ex) {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException(); }
+ @Override public void obtrudeValue(T value) {
+ throw new UnsupportedOperationException(); }
+ @Override public void obtrudeException(Throwable ex) {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean isDone() {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean isCancelled() {
+ throw new UnsupportedOperationException(); }
+ @Override public boolean isCompletedExceptionally() {
+ throw new UnsupportedOperationException(); }
+ @Override public int getNumberOfDependents() {
+ throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<T> completeAsync
+ (Supplier<? extends T> supplier, Executor executor) {
+ throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<T> completeAsync
+ (Supplier<? extends T> supplier) {
+ throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<T> orTimeout
+ (long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<T> completeOnTimeout
+ (T value, long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException(); }
+ }
+
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
+ private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
- final sun.misc.Unsafe u;
- UNSAFE = u = sun.misc.Unsafe.getUnsafe();
- Class<?> k = CompletableFuture.class;
- RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
- STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
- NEXT = u.objectFieldOffset
+ RESULT = U.objectFieldOffset
+ (CompletableFuture.class.getDeclaredField("result"));
+ STACK = U.objectFieldOffset
+ (CompletableFuture.class.getDeclaredField("stack"));
+ NEXT = U.objectFieldOffset
(Completion.class.getDeclaredField("next"));
- } catch (Exception x) {
- throw new Error(x);
+ } catch (ReflectiveOperationException e) {
+ throw new Error(e);
}
+
+ // Reduce the risk of rare disastrous classloading in first call to
+ // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
+ Class<?> ensureLoaded = LockSupport.class;
}
}