# HG changeset patch # User dl # Date 1444777496 25200 # Node ID e5e5ab01398e6ce2616a4f544d88db53a57cf999 # Parent ea54ac8672e7ed4f68d52f2c624a6bafefe8a87a 8134851: Integrate CompletableFuture with API enhancements 8039378: CompletableFuture: Avoid StackOverflowError for long linear chains Reviewed-by: martin, psandoz, chegar diff -r ea54ac8672e7 -r e5e5ab01398e jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Wed Oct 14 00:08:42 2015 +0200 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Oct 13 16:04:56 2015 -0700 @@ -34,23 +34,13 @@ */ package java.util.concurrent; -import java.util.function.Supplier; -import java.util.function.Consumer; + +import java.util.concurrent.locks.LockSupport; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.function.BiFunction; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.locks.LockSupport; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; /** * A {@link Future} that may be explicitly completed (setting its @@ -71,19 +61,32 @@ *
  • Actions supplied for dependent completions of * non-async methods may be performed by the thread that * completes the current CompletableFuture, or by any other caller of - * a completion method.
  • + * a completion method. * *
  • All async methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in - * which case, a new Thread is created to run each task). To simplify - * monitoring, debugging, and tracking, all generated asynchronous - * tasks are instances of the marker interface {@link - * AsynchronousCompletionTask}.
  • + * which case, a new Thread is created to run each task). This may be + * overridden for non-static methods in subclasses by defining method + * {@link #defaultExecutor()}. To simplify monitoring, debugging, + * and tracking, all generated asynchronous tasks are instances of the + * marker interface {@link AsynchronousCompletionTask}. Operations + * with time-delays can use adapter methods defined in this class, for + * example: {@code supplyAsync(supplier, delayedExecutor(timeout, + * timeUnit))}. To support methods with delays and timeouts, this + * class maintains at most one daemon thread for triggering and + * cancelling actions, not for running them. * *
  • All CompletionStage methods are implemented independently of * other public methods, so the behavior of one method is not impacted - * by overrides of others in subclasses.
  • + * by overrides of others in subclasses. + * + *
  • All CompletionStage methods return CompletableFutures. To + * restrict usages to only those methods defined in interface + * CompletionStage, use method {@link #minimalCompletionStage}. Or to + * ensure only that clients do not themselves modify a future, use + * method {@link #copy}. + * * *

    CompletableFuture also implements {@link Future} with the following * policies:

    + * in these cases. + * + * + *

    Arguments used to pass a completion result (that is, for + * parameters of type {@code T}) for methods accepting them may be + * null, but passing a null value for any other parameter will result + * in a {@link NullPointerException} being thrown. + * + *

    Subclasses of this class should normally override the "virtual + * constructor" method {@link #newIncompleteFuture}, which establishes + * the concrete type returned by CompletionStage methods. For example, + * here is a class that substitutes a different default Executor and + * disables the {@code obtrude} methods: + * + *

     {@code
    + * class MyCompletableFuture extends CompletableFuture {
    + *   static final Executor myExecutor = ...;
    + *   public MyCompletableFuture() { }
    + *   public  CompletableFuture newIncompleteFuture() {
    + *     return new MyCompletableFuture(); }
    + *   public Executor defaultExecutor() {
    + *     return myExecutor; }
    + *   public void obtrudeValue(T value) {
    + *     throw new UnsupportedOperationException(); }
    + *   public void obtrudeException(Throwable ex) {
    + *     throw new UnsupportedOperationException(); }
    + * }}
    * * @author Doug Lea * @since 1.8 + * @param The result type returned by this future's {@code join} + * and {@code get} methods */ public class CompletableFuture implements Future, CompletionStage { @@ -150,9 +181,7 @@ * fields for source(s), actions, and dependent. They are * boringly similar, differing from others only with respect to * underlying functional forms. We do this so that users don't - * encounter layers of adaptors in common usages. We also - * include "Relay" classes/methods that don't correspond to user - * methods; they copy results from one stage to another. + * encounter layers of adapters in common usages. * * * Boolean CompletableFuture method x(...) (for example * uniApply) takes all of the arguments needed to check that an @@ -219,18 +248,18 @@ volatile Completion stack; // Top of Treiber stack of dependent actions final boolean internalComplete(Object r) { // CAS from null to r - return UNSAFE.compareAndSwapObject(this, RESULT, null, r); + return U.compareAndSwapObject(this, RESULT, null, r); } final boolean casStack(Completion cmp, Completion val) { - return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); + return U.compareAndSwapObject(this, STACK, cmp, val); } /** Returns true if successfully pushed c onto stack. */ final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); - return UNSAFE.compareAndSwapObject(this, STACK, h, c); + return U.compareAndSwapObject(this, STACK, h, c); } /** Unconditionally pushes c onto stack, retrying if necessary. */ @@ -250,8 +279,8 @@ /** Completes with the null value, unless already completed. */ final boolean completeNull() { - return UNSAFE.compareAndSwapObject(this, RESULT, null, - NIL); + return U.compareAndSwapObject(this, RESULT, null, + NIL); } /** Returns the encoding of the given non-exceptional value. */ @@ -261,8 +290,8 @@ /** Completes with a non-exceptional result, unless already completed. */ final boolean completeValue(T t) { - return UNSAFE.compareAndSwapObject(this, RESULT, null, - (t == null) ? NIL : t); + return U.compareAndSwapObject(this, RESULT, null, + (t == null) ? NIL : t); } /** @@ -276,8 +305,8 @@ /** Completes with an exceptional result, unless already completed. */ final boolean completeThrowable(Throwable x) { - return UNSAFE.compareAndSwapObject(this, RESULT, null, - encodeThrowable(x)); + return U.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x)); } /** @@ -304,8 +333,8 @@ * existing CompletionException. */ final boolean completeThrowable(Throwable x, Object r) { - return UNSAFE.compareAndSwapObject(this, RESULT, null, - encodeThrowable(x, r)); + return U.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x, r)); } /** @@ -334,8 +363,8 @@ * If exceptional, r is first coerced to a CompletionException. */ final boolean completeRelay(Object r) { - return UNSAFE.compareAndSwapObject(this, RESULT, null, - encodeRelay(r)); + return U.compareAndSwapObject(this, RESULT, null, + encodeRelay(r)); } /** @@ -390,14 +419,14 @@ public static interface AsynchronousCompletionTask { } - private static final boolean useCommonPool = + private static final boolean USE_COMMON_POOL = (ForkJoinPool.getCommonPoolParallelism() > 1); /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ - private static final Executor asyncPool = useCommonPool ? + private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ @@ -407,11 +436,11 @@ /** * Null-checks user executor argument, and translates uses of - * commonPool to asyncPool in case parallelism disabled. + * commonPool to ASYNC_POOL in case parallelism disabled. */ static Executor screenExecutor(Executor e) { - if (!useCommonPool && e == ForkJoinPool.commonPool()) - return asyncPool; + if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) + return ASYNC_POOL; if (e == null) throw new NullPointerException(); return e; } @@ -421,6 +450,12 @@ static final int ASYNC = 1; static final int NESTED = -1; + /** + * Spins before blocking in waitingGet + */ + static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? + 1 << 8 : 0); + /* ------------- Base Completion classes and operations -------------- */ @SuppressWarnings("serial") @@ -440,13 +475,13 @@ abstract boolean isLive(); public final void run() { tryFire(ASYNC); } - public final boolean exec() { tryFire(ASYNC); return true; } + public final boolean exec() { tryFire(ASYNC); return false; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} } static void lazySetNext(Completion c, Completion next) { - UNSAFE.putOrderedObject(c, NEXT, next); + U.putOrderedObject(c, NEXT, next); } /** @@ -610,7 +645,7 @@ private CompletableFuture uniApplyStage( Executor e, Function f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.uniApply(this, f, null)) { UniApply c = new UniApply(e, d, this, f); push(c); @@ -665,7 +700,7 @@ private CompletableFuture uniAcceptStage(Executor e, Consumer f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept c = new UniAccept(e, d, this, f); push(c); @@ -713,7 +748,7 @@ private CompletableFuture uniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.uniRun(this, f, null)) { UniRun c = new UniRun(e, d, this, f); push(c); @@ -774,7 +809,7 @@ private CompletableFuture uniWhenCompleteStage( Executor e, BiConsumer f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete c = new UniWhenComplete(e, d, this, f); push(c); @@ -830,7 +865,7 @@ private CompletableFuture uniHandleStage( Executor e, BiFunction f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.uniHandle(this, f, null)) { UniHandle c = new UniHandle(e, d, this, f); push(c); @@ -880,7 +915,7 @@ private CompletableFuture uniExceptionallyStage( Function f) { if (f == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (!d.uniExceptionally(this, f, null)) { UniExceptionally c = new UniExceptionally(d, this, f); push(c); @@ -912,6 +947,30 @@ return true; } + private CompletableFuture uniCopyStage() { + Object r; + CompletableFuture d = newIncompleteFuture(); + if ((r = result) != null) + d.completeRelay(r); + else { + UniRelay c = new UniRelay(d, this); + push(c); + c.tryFire(SYNC); + } + return d; + } + + private MinimalStage uniAsMinimalStage() { + Object r; + if ((r = result) != null) + return new MinimalStage(encodeRelay(r)); + MinimalStage d = new MinimalStage(); + UniRelay c = new UniRelay(d, this); + push(c); + c.tryFire(SYNC); + return d; + } + @SuppressWarnings("serial") static final class UniCompose extends UniCompletion { Function> fn; @@ -967,31 +1026,32 @@ private CompletableFuture uniComposeStage( Executor e, Function> f) { if (f == null) throw new NullPointerException(); - Object r; Throwable x; + Object r, s; Throwable x; + CompletableFuture d = newIncompleteFuture(); if (e == null && (r = result) != null) { - // try to return function result directly if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { - return new CompletableFuture(encodeThrowable(x, r)); + d.result = encodeThrowable(x, r); + return d; } r = null; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture g = f.apply(t).toCompletableFuture(); - Object s = g.result; - if (s != null) - return new CompletableFuture(encodeRelay(s)); - CompletableFuture d = new CompletableFuture(); - UniRelay copy = new UniRelay(d, g); - g.push(copy); - copy.tryFire(SYNC); + if ((s = g.result) != null) + d.completeRelay(s); + else { + UniRelay c = new UniRelay(d, g); + g.push(c); + c.tryFire(SYNC); + } return d; } catch (Throwable ex) { - return new CompletableFuture(encodeThrowable(ex)); + d.result = encodeThrowable(ex); + return d; } } - CompletableFuture d = new CompletableFuture(); UniCompose c = new UniCompose(e, d, this, f); push(c); c.tryFire(SYNC); @@ -1116,7 +1176,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.biApply(this, b, f, null)) { BiApply c = new BiApply(e, d, this, b, f); bipush(b, c); @@ -1188,7 +1248,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept c = new BiAccept(e, d, this, b, f); bipush(b, c); @@ -1247,7 +1307,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.biRun(this, b, f, null)) { BiRun c = new BiRun<>(e, d, this, b, f); bipush(b, c); @@ -1302,7 +1362,7 @@ if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : - andTree(cfs, mid+1, hi))) == null) + andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay c = new BiRelay<>(d, a, b); @@ -1388,7 +1448,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.orApply(this, b, f, null)) { OrApply c = new OrApply(e, d, this, b, f); orpush(b, c); @@ -1452,7 +1512,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.orAccept(this, b, f, null)) { OrAccept c = new OrAccept(e, d, this, b, f); orpush(b, c); @@ -1510,7 +1570,7 @@ CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); - CompletableFuture d = new CompletableFuture(); + CompletableFuture d = newIncompleteFuture(); if (e != null || !d.orRun(this, b, f, null)) { OrRun c = new OrRun<>(e, d, this, b, f); orpush(b, c); @@ -1556,7 +1616,7 @@ if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : - orTree(cfs, mid+1, hi))) == null) + orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.orRelay(a, b)) { OrRelay c = new OrRelay<>(d, a, b); @@ -1571,9 +1631,9 @@ @SuppressWarnings("serial") static final class AsyncSupply extends ForkJoinTask - implements Runnable, AsynchronousCompletionTask { - CompletableFuture dep; Supplier fn; - AsyncSupply(CompletableFuture dep, Supplier fn) { + implements Runnable, AsynchronousCompletionTask { + CompletableFuture dep; Supplier fn; + AsyncSupply(CompletableFuture dep, Supplier fn) { this.dep = dep; this.fn = fn; } @@ -1582,7 +1642,7 @@ public final boolean exec() { run(); return true; } public void run() { - CompletableFuture d; Supplier f; + CompletableFuture d; Supplier f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { @@ -1607,7 +1667,7 @@ @SuppressWarnings("serial") static final class AsyncRun extends ForkJoinTask - implements Runnable, AsynchronousCompletionTask { + implements Runnable, AsynchronousCompletionTask { CompletableFuture dep; Runnable fn; AsyncRun(CompletableFuture dep, Runnable fn) { this.dep = dep; this.fn = fn; @@ -1651,14 +1711,15 @@ @SuppressWarnings("serial") static final class Signaller extends Completion implements ForkJoinPool.ManagedBlocker { - long nanos; // wait time if timed + long nanos; // remaining wait time if timed final long deadline; // non-zero if timed - volatile int interruptControl; // > 0: interruptible, < 0: interrupted + final boolean interruptible; + boolean interrupted; volatile Thread thread; Signaller(boolean interruptible, long nanos, long deadline) { this.thread = Thread.currentThread(); - this.interruptControl = interruptible ? 1 : 0; + this.interruptible = interruptible; this.nanos = nanos; this.deadline = deadline; } @@ -1671,29 +1732,22 @@ return null; } public boolean isReleasable() { - if (thread == null) - return true; - if (Thread.interrupted()) { - int i = interruptControl; - interruptControl = -1; - if (i > 0) - return true; - } - if (deadline != 0L && - (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { - thread = null; - return true; - } - return false; + if (Thread.interrupted()) + interrupted = true; + return ((interrupted && interruptible) || + (deadline != 0L && + (nanos <= 0L || + (nanos = deadline - System.nanoTime()) <= 0L)) || + thread == null); } public boolean block() { - if (isReleasable()) - return true; - else if (deadline == 0L) - LockSupport.park(this); - else if (nanos > 0L) - LockSupport.parkNanos(this, nanos); - return isReleasable(); + while (!isReleasable()) { + if (deadline == 0L) + LockSupport.park(this); + else + LockSupport.parkNanos(this, nanos); + } + return true; } final boolean isLive() { return thread != null; } } @@ -1705,13 +1759,10 @@ private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; - int spins = -1; + int spins = SPINS; Object r; while ((r = result) == null) { - if (spins < 0) - spins = (Runtime.getRuntime().availableProcessors() > 1) ? - 1 << 8 : 0; // Use brief spin-wait on multiprocessors - else if (spins > 0) { + if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } @@ -1719,29 +1770,27 @@ q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = tryPushStack(q); - else if (interruptible && q.interruptControl < 0) { - q.thread = null; - cleanStack(); - return null; - } - else if (q.thread != null && result == null) { + else { try { ForkJoinPool.managedBlock(q); - } catch (InterruptedException ie) { - q.interruptControl = -1; + } catch (InterruptedException ie) { // currently cannot happen + q.interrupted = true; } + if (q.interrupted && interruptible) + break; } } if (q != null) { q.thread = null; - if (q.interruptControl < 0) { + if (q.interrupted) { if (interruptible) - r = null; // report interruption + cleanStack(); else Thread.currentThread().interrupt(); } } - postComplete(); + if (r != null) + postComplete(); return r; } @@ -1752,37 +1801,39 @@ private Object timedGet(long nanos) throws TimeoutException { if (Thread.interrupted()) return null; - if (nanos <= 0L) - throw new TimeoutException(); - long d = System.nanoTime() + nanos; - Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 - boolean queued = false; - Object r; - // We intentionally don't spin here (as waitingGet does) because - // the call to nanoTime() above acts much like a spin. - while ((r = result) == null) { - if (!queued) - queued = tryPushStack(q); - else if (q.interruptControl < 0 || q.nanos <= 0L) { - q.thread = null; - cleanStack(); - if (q.interruptControl < 0) - return null; - throw new TimeoutException(); - } - else if (q.thread != null && result == null) { - try { - ForkJoinPool.managedBlock(q); - } catch (InterruptedException ie) { - q.interruptControl = -1; + if (nanos > 0L) { + long d = System.nanoTime() + nanos; + long deadline = (d == 0L) ? 1L : d; // avoid 0 + Signaller q = null; + boolean queued = false; + Object r; + while ((r = result) == null) { // similar to untimed, without spins + if (q == null) + q = new Signaller(true, nanos, deadline); + else if (!queued) + queued = tryPushStack(q); + else if (q.nanos <= 0L) + break; + else { + try { + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ie) { + q.interrupted = true; + } + if (q.interrupted) + break; } } + if (q != null) + q.thread = null; + if (r != null) + postComplete(); + else + cleanStack(); + if (r != null || (q != null && q.interrupted)) + return r; } - if (q.interruptControl < 0) - r = null; - q.thread = null; - postComplete(); - return r; + throw new TimeoutException(); } /* ------------- public methods -------------- */ @@ -1796,7 +1847,7 @@ /** * Creates a new complete CompletableFuture with given encoded result. */ - private CompletableFuture(Object r) { + CompletableFuture(Object r) { this.result = r; } @@ -1811,7 +1862,7 @@ * @return the new CompletableFuture */ public static CompletableFuture supplyAsync(Supplier supplier) { - return asyncSupplyStage(asyncPool, supplier); + return asyncSupplyStage(ASYNC_POOL, supplier); } /** @@ -1840,7 +1891,7 @@ * @return the new CompletableFuture */ public static CompletableFuture runAsync(Runnable runnable) { - return asyncRunStage(asyncPool, runnable); + return asyncRunStage(ASYNC_POOL, runnable); } /** @@ -1985,7 +2036,7 @@ public CompletableFuture thenApplyAsync( Function fn) { - return uniApplyStage(asyncPool, fn); + return uniApplyStage(defaultExecutor(), fn); } public CompletableFuture thenApplyAsync( @@ -1998,7 +2049,7 @@ } public CompletableFuture thenAcceptAsync(Consumer action) { - return uniAcceptStage(asyncPool, action); + return uniAcceptStage(defaultExecutor(), action); } public CompletableFuture thenAcceptAsync(Consumer action, @@ -2011,7 +2062,7 @@ } public CompletableFuture thenRunAsync(Runnable action) { - return uniRunStage(asyncPool, action); + return uniRunStage(defaultExecutor(), action); } public CompletableFuture thenRunAsync(Runnable action, @@ -2028,7 +2079,7 @@ public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn) { - return biApplyStage(asyncPool, other, fn); + return biApplyStage(defaultExecutor(), other, fn); } public CompletableFuture thenCombineAsync( @@ -2046,7 +2097,7 @@ public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action) { - return biAcceptStage(asyncPool, other, action); + return biAcceptStage(defaultExecutor(), other, action); } public CompletableFuture thenAcceptBothAsync( @@ -2062,7 +2113,7 @@ public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action) { - return biRunStage(asyncPool, other, action); + return biRunStage(defaultExecutor(), other, action); } public CompletableFuture runAfterBothAsync(CompletionStage other, @@ -2078,7 +2129,7 @@ public CompletableFuture applyToEitherAsync( CompletionStage other, Function fn) { - return orApplyStage(asyncPool, other, fn); + return orApplyStage(defaultExecutor(), other, fn); } public CompletableFuture applyToEitherAsync( @@ -2094,7 +2145,7 @@ public CompletableFuture acceptEitherAsync( CompletionStage other, Consumer action) { - return orAcceptStage(asyncPool, other, action); + return orAcceptStage(defaultExecutor(), other, action); } public CompletableFuture acceptEitherAsync( @@ -2110,7 +2161,7 @@ public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action) { - return orRunStage(asyncPool, other, action); + return orRunStage(defaultExecutor(), other, action); } public CompletableFuture runAfterEitherAsync(CompletionStage other, @@ -2126,7 +2177,7 @@ public CompletableFuture thenComposeAsync( Function> fn) { - return uniComposeStage(asyncPool, fn); + return uniComposeStage(defaultExecutor(), fn); } public CompletableFuture thenComposeAsync( @@ -2142,7 +2193,7 @@ public CompletableFuture whenCompleteAsync( BiConsumer action) { - return uniWhenCompleteStage(asyncPool, action); + return uniWhenCompleteStage(defaultExecutor(), action); } public CompletableFuture whenCompleteAsync( @@ -2157,7 +2208,7 @@ public CompletableFuture handleAsync( BiFunction fn) { - return uniHandleStage(asyncPool, fn); + return uniHandleStage(defaultExecutor(), fn); } public CompletableFuture handleAsync( @@ -2196,6 +2247,7 @@ return uniExceptionallyStage(fn); } + /* ------------- Arbitrary-arity constructions -------------- */ /** @@ -2353,10 +2405,12 @@ */ public String toString() { Object r = result; - int count; + int count = 0; // avoid call to getNumberOfDependents in case disabled + for (Completion p = stack; p != null; p = p.next) + ++count; return super.toString() + ((r == null) ? - (((count = getNumberOfDependents()) == 0) ? + ((count == 0) ? "[Not completed]" : "[Not completed, " + count + " dependents]") : (((r instanceof AltResult) && ((AltResult)r).ex != null) ? @@ -2364,22 +2418,381 @@ "[Completed normally]")); } + // jdk9 additions + + /** + * Returns a new incomplete CompletableFuture of the type to be + * returned by a CompletionStage method. Subclasses should + * normally override this method to return an instance of the same + * class as this CompletableFuture. The default implementation + * returns an instance of class CompletableFuture. + * + * @param the type of the value + * @return a new CompletableFuture + * @since 1.9 + */ + public CompletableFuture newIncompleteFuture() { + return new CompletableFuture(); + } + + /** + * Returns the default Executor used for async methods that do not + * specify an Executor. This class uses the {@link + * ForkJoinPool#commonPool()} if it supports more than one + * parallel thread, or else an Executor using one thread per async + * task. This method may be overridden in subclasses to return + * an Executor that provides at least one independent thread. + * + * @return the executor + * @since 1.9 + */ + public Executor defaultExecutor() { + return ASYNC_POOL; + } + + /** + * Returns a new CompletableFuture that is completed normally with + * the same value as this CompletableFuture when it completes + * normally. If this CompletableFuture completes exceptionally, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException with this exception as cause. The + * behavior is equivalent to {@code thenApply(x -> x)}. This + * method may be useful as a form of "defensive copying", to + * prevent clients from completing, while still being able to + * arrange dependent actions. + * + * @return the new CompletableFuture + * @since 1.9 + */ + public CompletableFuture copy() { + return uniCopyStage(); + } + + /** + * Returns a new CompletionStage that is completed normally with + * the same value as this CompletableFuture when it completes + * normally, and cannot be independently completed or otherwise + * used in ways not defined by the methods of interface {@link + * CompletionStage}. If this CompletableFuture completes + * exceptionally, then the returned CompletionStage completes + * exceptionally with a CompletionException with this exception as + * cause. + * + * @return the new CompletionStage + * @since 1.9 + */ + public CompletionStage minimalCompletionStage() { + return uniAsMinimalStage(); + } + + /** + * Completes this CompletableFuture with the result of + * the given Supplier function invoked from an asynchronous + * task using the given executor. + * + * @param supplier a function returning the value to be used + * to complete this CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return this CompletableFuture + * @since 1.9 + */ + public CompletableFuture completeAsync(Supplier supplier, + Executor executor) { + if (supplier == null || executor == null) + throw new NullPointerException(); + executor.execute(new AsyncSupply(this, supplier)); + return this; + } + + /** + * Completes this CompletableFuture with the result of the given + * Supplier function invoked from an asynchronous task using the + * default executor. + * + * @param supplier a function returning the value to be used + * to complete this CompletableFuture + * @return this CompletableFuture + * @since 1.9 + */ + public CompletableFuture completeAsync(Supplier supplier) { + return completeAsync(supplier, defaultExecutor()); + } + + /** + * Exceptionally completes this CompletableFuture with + * a {@link TimeoutException} if not otherwise completed + * before the given timeout. + * + * @param timeout how long to wait before completing exceptionally + * with a TimeoutException, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return this CompletableFuture + * @since 1.9 + */ + public CompletableFuture orTimeout(long timeout, TimeUnit unit) { + if (unit == null) + throw new NullPointerException(); + if (result == null) + whenComplete(new Canceller(Delayer.delay(new Timeout(this), + timeout, unit))); + return this; + } + + /** + * Completes this CompletableFuture with the given value if not + * otherwise completed before the given timeout. + * + * @param value the value to use upon timeout + * @param timeout how long to wait before completing normally + * with the given value, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return this CompletableFuture + * @since 1.9 + */ + public CompletableFuture completeOnTimeout(T value, long timeout, + TimeUnit unit) { + if (unit == null) + throw new NullPointerException(); + if (result == null) + whenComplete(new Canceller(Delayer.delay( + new DelayedCompleter(this, value), + timeout, unit))); + return this; + } + + /** + * Returns a new Executor that submits a task to the given base + * executor after the given delay (or no delay if non-positive). + * Each delay commences upon invocation of the returned executor's + * {@code execute} method. + * + * @param delay how long to delay, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code delay} parameter + * @param executor the base executor + * @return the new delayed executor + * @since 1.9 + */ + public static Executor delayedExecutor(long delay, TimeUnit unit, + Executor executor) { + if (unit == null || executor == null) + throw new NullPointerException(); + return new DelayedExecutor(delay, unit, executor); + } + + /** + * Returns a new Executor that submits a task to the default + * executor after the given delay (or no delay if non-positive). + * Each delay commences upon invocation of the returned executor's + * {@code execute} method. + * + * @param delay how long to delay, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code delay} parameter + * @return the new delayed executor + * @since 1.9 + */ + public static Executor delayedExecutor(long delay, TimeUnit unit) { + if (unit == null) + throw new NullPointerException(); + return new DelayedExecutor(delay, unit, ASYNC_POOL); + } + + /** + * Returns a new CompletionStage that is already completed with + * the given value and supports only those methods in + * interface {@link CompletionStage}. + * + * @param value the value + * @param the type of the value + * @return the completed CompletionStage + * @since 1.9 + */ + public static CompletionStage completedStage(U value) { + return new MinimalStage((value == null) ? NIL : value); + } + + /** + * Returns a new CompletableFuture that is already completed + * exceptionally with the given exception. + * + * @param ex the exception + * @param the type of the value + * @return the exceptionally completed CompletableFuture + * @since 1.9 + */ + public static CompletableFuture failedFuture(Throwable ex) { + if (ex == null) throw new NullPointerException(); + return new CompletableFuture(new AltResult(ex)); + } + + /** + * Returns a new CompletionStage that is already completed + * exceptionally with the given exception and supports only those + * methods in interface {@link CompletionStage}. + * + * @param ex the exception + * @param the type of the value + * @return the exceptionally completed CompletionStage + * @since 1.9 + */ + public static CompletionStage failedStage(Throwable ex) { + if (ex == null) throw new NullPointerException(); + return new MinimalStage(new AltResult(ex)); + } + + /** + * Singleton delay scheduler, used only for starting and + * cancelling tasks. + */ + static final class Delayer { + static ScheduledFuture delay(Runnable command, long delay, + TimeUnit unit) { + return delayer.schedule(command, delay, unit); + } + + static final class DaemonThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("CompletableFutureDelayScheduler"); + return t; + } + } + + static final ScheduledThreadPoolExecutor delayer; + static { + (delayer = new ScheduledThreadPoolExecutor( + 1, new DaemonThreadFactory())). + setRemoveOnCancelPolicy(true); + } + } + + // Little class-ified lambdas to better support monitoring + + static final class DelayedExecutor implements Executor { + final long delay; + final TimeUnit unit; + final Executor executor; + DelayedExecutor(long delay, TimeUnit unit, Executor executor) { + this.delay = delay; this.unit = unit; this.executor = executor; + } + public void execute(Runnable r) { + Delayer.delay(new TaskSubmitter(executor, r), delay, unit); + } + } + + /** Action to submit user task */ + static final class TaskSubmitter implements Runnable { + final Executor executor; + final Runnable action; + TaskSubmitter(Executor executor, Runnable action) { + this.executor = executor; + this.action = action; + } + public void run() { executor.execute(action); } + } + + /** Action to completeExceptionally on timeout */ + static final class Timeout implements Runnable { + final CompletableFuture f; + Timeout(CompletableFuture f) { this.f = f; } + public void run() { + if (f != null && !f.isDone()) + f.completeExceptionally(new TimeoutException()); + } + } + + /** Action to complete on timeout */ + static final class DelayedCompleter implements Runnable { + final CompletableFuture f; + final U u; + DelayedCompleter(CompletableFuture f, U u) { this.f = f; this.u = u; } + public void run() { + if (f != null) + f.complete(u); + } + } + + /** Action to cancel unneeded timeouts */ + static final class Canceller implements BiConsumer { + final Future f; + Canceller(Future f) { this.f = f; } + public void accept(Object ignore, Throwable ex) { + if (ex == null && f != null && !f.isDone()) + f.cancel(false); + } + } + + /** + * A subclass that just throws UOE for most non-CompletionStage methods. + */ + static final class MinimalStage extends CompletableFuture { + MinimalStage() { } + MinimalStage(Object r) { super(r); } + @Override public CompletableFuture newIncompleteFuture() { + return new MinimalStage(); } + @Override public T get() { + throw new UnsupportedOperationException(); } + @Override public T get(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); } + @Override public T getNow(T valueIfAbsent) { + throw new UnsupportedOperationException(); } + @Override public T join() { + throw new UnsupportedOperationException(); } + @Override public boolean complete(T value) { + throw new UnsupportedOperationException(); } + @Override public boolean completeExceptionally(Throwable ex) { + throw new UnsupportedOperationException(); } + @Override public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); } + @Override public void obtrudeValue(T value) { + throw new UnsupportedOperationException(); } + @Override public void obtrudeException(Throwable ex) { + throw new UnsupportedOperationException(); } + @Override public boolean isDone() { + throw new UnsupportedOperationException(); } + @Override public boolean isCancelled() { + throw new UnsupportedOperationException(); } + @Override public boolean isCompletedExceptionally() { + throw new UnsupportedOperationException(); } + @Override public int getNumberOfDependents() { + throw new UnsupportedOperationException(); } + @Override public CompletableFuture completeAsync + (Supplier supplier, Executor executor) { + throw new UnsupportedOperationException(); } + @Override public CompletableFuture completeAsync + (Supplier supplier) { + throw new UnsupportedOperationException(); } + @Override public CompletableFuture orTimeout + (long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); } + @Override public CompletableFuture completeOnTimeout + (T value, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); } + } + // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; + private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); private static final long RESULT; private static final long STACK; private static final long NEXT; static { try { - final sun.misc.Unsafe u; - UNSAFE = u = sun.misc.Unsafe.getUnsafe(); - Class k = CompletableFuture.class; - RESULT = u.objectFieldOffset(k.getDeclaredField("result")); - STACK = u.objectFieldOffset(k.getDeclaredField("stack")); - NEXT = u.objectFieldOffset + RESULT = U.objectFieldOffset + (CompletableFuture.class.getDeclaredField("result")); + STACK = U.objectFieldOffset + (CompletableFuture.class.getDeclaredField("stack")); + NEXT = U.objectFieldOffset (Completion.class.getDeclaredField("next")); - } catch (Exception x) { - throw new Error(x); + } catch (ReflectiveOperationException e) { + throw new Error(e); } + + // Reduce the risk of rare disastrous classloading in first call to + // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 + Class ensureLoaded = LockSupport.class; } } diff -r ea54ac8672e7 -r e5e5ab01398e jdk/src/java.base/share/classes/java/util/concurrent/CompletionStage.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletionStage.java Wed Oct 14 00:08:42 2015 +0200 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletionStage.java Tue Oct 13 16:04:56 2015 -0700 @@ -34,12 +34,11 @@ */ package java.util.concurrent; -import java.util.function.Supplier; -import java.util.function.Consumer; + import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.BiFunction; -import java.util.concurrent.Executor; /** * A stage of a possibly asynchronous computation, that performs an @@ -56,9 +55,9 @@ * For example, {@code stage.thenApply(x -> square(x)).thenAccept(x -> * System.out.print(x)).thenRun(() -> System.out.println())}. An * additional form (compose) applies functions of stages - * themselves, rather than their results.
  • + * themselves, rather than their results. * - *
  • One stage's execution may be triggered by completion of a + *
  • One stage's execution may be triggered by completion of a * single stage, or both of two stages, or either of two stages. * Dependencies on a single stage are arranged using methods with * prefix then. Those triggered by completion of @@ -66,9 +65,9 @@ * effects, using correspondingly named methods. Those triggered by * either of two stages make no guarantees about which of the * results or effects are used for the dependent stage's - * computation.
  • + * computation. * - *
  • Dependencies among stages control the triggering of + *
  • Dependencies among stages control the triggering of * computations, but do not otherwise guarantee any particular * ordering. Additionally, execution of a new stage's computations may * be arranged in any of three ways: default execution, default @@ -81,7 +80,7 @@ * properties, and might not even support concurrent execution, but * are arranged for processing in a way that accommodates asynchrony. * - *
  • Two method forms support processing whether the triggering + *
  • Two method forms support processing whether the triggering * stage completed normally or exceptionally: Method {@link * #whenComplete whenComplete} allows injection of an action * regardless of outcome, otherwise preserving the outcome in its @@ -100,7 +99,7 @@ * stage completes normally or exceptionally. In the case of method * {@code whenComplete}, when the supplied action itself encounters an * exception, then the stage exceptionally completes with this - * exception if not already completed exceptionally.
  • + * exception if not already completed exceptionally. * * * @@ -587,7 +586,7 @@ /** * Returns a new CompletionStage that, when this stage completes - * normally, is executed with this stage as the argument + * normally, is executed with this stage's result as the argument * to the supplied function. * * See the {@link CompletionStage} documentation for rules @@ -603,7 +602,7 @@ /** * Returns a new CompletionStage that, when this stage completes * normally, is executed using this stage's default asynchronous - * execution facility, with this stage as the argument to the + * execution facility, with this stage's result as the argument to the * supplied function. * * See the {@link CompletionStage} documentation for rules @@ -652,12 +651,14 @@ * Returns a new CompletionStage with the same result or exception as * this stage, that executes the given action when this stage completes. * - *

    When this stage is complete, the given action is invoked with the - * result (or {@code null} if none) and the exception (or {@code null} - * if none) of this stage as arguments. The returned stage is completed - * when the action returns. If the supplied action itself encounters an - * exception, then the returned stage exceptionally completes with this - * exception unless this stage also completed exceptionally. + *

    When this stage is complete, the given action is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments. The returned + * stage is completed when the action returns. If the supplied + * action itself encounters an exception, then the returned stage + * exceptionally completes with this exception unless this stage + * also completed exceptionally (in which case, the returned stage + * exceptionally completes with the original exception). * * @param action the action to perform * @return the new CompletionStage diff -r ea54ac8672e7 -r e5e5ab01398e jdk/test/java/util/concurrent/CompletableFuture/Basic.java --- a/jdk/test/java/util/concurrent/CompletableFuture/Basic.java Wed Oct 14 00:08:42 2015 +0200 +++ b/jdk/test/java/util/concurrent/CompletableFuture/Basic.java Tue Oct 13 16:04:56 2015 -0700 @@ -53,7 +53,6 @@ import static java.util.concurrent.ForkJoinPool.*; import java.util.concurrent.atomic.AtomicInteger; - public class Basic { static void checkCompletedNormally(CompletableFuture cf, Object value) { @@ -66,6 +65,7 @@ try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); } try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); } check(cf.isDone(), "Expected isDone to be true, got:" + cf); + check(!cf.isCompletedExceptionally(), "Expected isCompletedExceptionally to return false"); check(!cf.isCancelled(), "Expected isCancelled to be false"); check(!cf.cancel(true), "Expected cancel to return false"); check(cf.toString().contains("[Completed normally]")); @@ -97,6 +97,7 @@ catch (CancellationException x) { if (cancelled) pass(); else fail(); } catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } check(cf.isDone(), "Expected isDone to be true, got:" + cf); + check(cf.isCompletedExceptionally(), "Expected isCompletedExceptionally"); check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled()); check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true)); check(cf.toString().contains("[Completed exceptionally]")); // ## TODO: 'E'xceptionally @@ -805,6 +806,49 @@ cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;}); checkCompletedExceptionally(cf1); checkCompletedNormally(cf2, 2); + + cf1 = supplyAsync(() -> 1); + cf2 = cf1.handleAsync((x,t) -> x+1); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 2); + + cf1 = supplyAsync(() -> { throw ex; }); + cf2 = cf1.handleAsync((x,t) -> { check(t.getCause() == ex); return 2;}); + checkCompletedExceptionally(cf1); + checkCompletedNormally(cf2, 2); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // whenComplete tests + //---------------------------------------------------------------- + try { + AtomicInteger count = new AtomicInteger(); + CompletableFuture cf2; + CompletableFuture cf1 = supplyAsync(() -> 1); + cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement()); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + check(count.get() == 1, "action count should be incremented"); + + final RuntimeException ex = new RuntimeException(); + cf1 = supplyAsync(() -> { throw ex; }); + cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement()); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(count.get() == 2, "action count should be incremented"); + + cf1 = supplyAsync(() -> 1); + cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement()); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + check(count.get() == 3, "action count should be incremented"); + + cf1 = supplyAsync(() -> { throw ex; }); + cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement()); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(count.get() == 4, "action count should be incremented"); + } catch (Throwable t) { unexpected(t); } } diff -r ea54ac8672e7 -r e5e5ab01398e jdk/test/java/util/concurrent/CompletableFuture/ThenComposeAsyncTest.java --- a/jdk/test/java/util/concurrent/CompletableFuture/ThenComposeAsyncTest.java Wed Oct 14 00:08:42 2015 +0200 +++ b/jdk/test/java/util/concurrent/CompletableFuture/ThenComposeAsyncTest.java Tue Oct 13 16:04:56 2015 -0700 @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; - /** * @test * @bug 8029164 diff -r ea54ac8672e7 -r e5e5ab01398e jdk/test/java/util/concurrent/CompletableFuture/ThenComposeExceptionTest.java --- a/jdk/test/java/util/concurrent/CompletableFuture/ThenComposeExceptionTest.java Wed Oct 14 00:08:42 2015 +0200 +++ b/jdk/test/java/util/concurrent/CompletableFuture/ThenComposeExceptionTest.java Tue Oct 13 16:04:56 2015 -0700 @@ -33,7 +33,6 @@ import java.util.function.BiFunction; import java.util.function.Consumer; - /** * @test * @bug 8068432 8072030