diff -r 67078b90e47b -r 1eee453c24c1 jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Fri Aug 29 11:59:34 2014 -0700 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Mon Sep 01 13:33:28 2014 +0200 @@ -50,7 +50,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** @@ -77,9 +76,9 @@ *
  • All async methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in - * which case, a new Thread is used). To simplify monitoring, - * debugging, and tracking, all generated asynchronous tasks are - * instances of the marker interface {@link + * which case, a new Thread is created to run each task). To simplify + * monitoring, debugging, and tracking, all generated asynchronous + * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}.
  • * *
  • All CompletionStage methods are implemented independently of @@ -113,141 +112,1556 @@ /* * Overview: * - * 1. Non-nullness of field result (set via CAS) indicates done. - * An AltResult is used to box null as a result, as well as to - * hold exceptions. Using a single field makes completion fast - * and simple to detect and trigger, at the expense of a lot of - * encoding and decoding that infiltrates many methods. One minor - * simplification relies on the (static) NIL (to box null results) - * being the only AltResult with a null exception field, so we - * don't usually need explicit comparisons with NIL. The CF - * exception propagation mechanics surrounding decoding rely on - * unchecked casts of decoded results really being unchecked, - * where user type errors are caught at point of use, as is - * currently the case in Java. These are highlighted by using - * SuppressWarnings-annotated temporaries. + * A CompletableFuture may have dependent completion actions, + * collected in a linked stack. It atomically completes by CASing + * a result field, and then pops off and runs those actions. This + * applies across normal vs exceptional outcomes, sync vs async + * actions, binary triggers, and various forms of completions. + * + * Non-nullness of field result (set via CAS) indicates done. An + * AltResult is used to box null as a result, as well as to hold + * exceptions. Using a single field makes completion simple to + * detect and trigger. Encoding and decoding is straightforward + * but adds to the sprawl of trapping and associating exceptions + * with targets. Minor simplifications rely on (static) NIL (to + * box null results) being the only AltResult with a null + * exception field, so we don't usually need explicit comparisons. + * Even though some of the generics casts are unchecked (see + * SuppressWarnings annotations), they are placed to be + * appropriate even if checked. * - * 2. Waiters are held in a Treiber stack similar to the one used - * in FutureTask, Phaser, and SynchronousQueue. See their - * internal documentation for algorithmic details. + * Dependent actions are represented by Completion objects linked + * as Treiber stacks headed by field "stack". There are Completion + * classes for each kind of action, grouped into single-input + * (UniCompletion), two-input (BiCompletion), projected + * (BiCompletions using either (not both) of two inputs), shared + * (CoCompletion, used by the second of two sources), zero-input + * source actions, and Signallers that unblock waiters. Class + * Completion extends ForkJoinTask to enable async execution + * (adding no space overhead because we exploit its "tag" methods + * to maintain claims). It is also declared as Runnable to allow + * usage with arbitrary executors. + * + * Support for each kind of CompletionStage relies on a separate + * class, along with two CompletableFuture methods: + * + * * A Completion class with name X corresponding to function, + * prefaced with "Uni", "Bi", or "Or". Each class contains + * fields for source(s), actions, and dependent. They are + * boringly similar, differing from others only with respect to + * underlying functional forms. We do this so that users don't + * encounter layers of adaptors in common usages. We also + * include "Relay" classes/methods that don't correspond to user + * methods; they copy results from one stage to another. + * + * * Boolean CompletableFuture method x(...) (for example + * uniApply) takes all of the arguments needed to check that an + * action is triggerable, and then either runs the action or + * arranges its async execution by executing its Completion + * argument, if present. The method returns true if known to be + * complete. * - * 3. Completions are also kept in a list/stack, and pulled off - * and run when completion is triggered. (We could even use the - * same stack as for waiters, but would give up the potential - * parallelism obtained because woken waiters help release/run - * others -- see method postComplete). Because post-processing - * may race with direct calls, class Completion opportunistically - * extends AtomicInteger so callers can claim the action via - * compareAndSet(0, 1). The Completion.run methods are all - * written a boringly similar uniform way (that sometimes includes - * unnecessary-looking checks, kept to maintain uniformity). - * There are enough dimensions upon which they differ that - * attempts to factor commonalities while maintaining efficiency - * require more lines of code than they would save. + * * Completion method tryFire(int mode) invokes the associated x + * method with its held arguments, and on success cleans up. + * The mode argument allows tryFire to be called twice (SYNC, + * then ASYNC); the first to screen and trap exceptions while + * arranging to execute, and the second when called from a + * task. (A few classes are not used async so take slightly + * different forms.) The claim() callback suppresses function + * invocation if already claimed by another thread. + * + * * CompletableFuture method xStage(...) is called from a public + * stage method of CompletableFuture x. It screens user + * arguments and invokes and/or creates the stage object. If + * not async and x is already complete, the action is run + * immediately. Otherwise a Completion c is created, pushed to + * x's stack (unless done), and started or triggered via + * c.tryFire. This also covers races possible if x completes + * while pushing. Classes with two inputs (for example BiApply) + * deal with races across both while pushing actions. The + * second completion is a CoCompletion pointing to the first, + * shared so that at most one performs the action. The + * multiple-arity methods allOf and anyOf do this pairwise to + * form trees of completions. + * + * Note that the generic type parameters of methods vary according + * to whether "this" is a source, dependent, or completion. * - * 4. The exported then/and/or methods do support a bit of - * factoring (see doThenApply etc). They must cope with the - * intrinsic races surrounding addition of a dependent action - * versus performing the action directly because the task is - * already complete. For example, a CF may not be complete upon - * entry, so a dependent completion is added, but by the time it - * is added, the target CF is complete, so must be directly - * executed. This is all done while avoiding unnecessary object - * construction in safe-bypass cases. + * Method postComplete is called upon completion unless the target + * is guaranteed not to be observable (i.e., not yet returned or + * linked). Multiple threads can call postComplete, which + * atomically pops each dependent action, and tries to trigger it + * via method tryFire, in NESTED mode. Triggering can propagate + * recursively, so NESTED mode returns its completed dependent (if + * one exists) for further processing by its caller (see method + * postFire). + * + * Blocking methods get() and join() rely on Signaller Completions + * that wake up waiting threads. The mechanics are similar to + * Treiber stack wait-nodes used in FutureTask, Phaser, and + * SynchronousQueue. See their internal documentation for + * algorithmic details. + * + * Without precautions, CompletableFutures would be prone to + * garbage accumulation as chains of Completions build up, each + * pointing back to its sources. So we null out fields as soon as + * possible (see especially method Completion.detach). The + * screening checks needed anyway harmlessly ignore null arguments + * that may have been obtained during races with threads nulling + * out fields. We also try to unlink fired Completions from + * stacks that might never be popped (see method postFire). + * Completion fields need not be declared as final or volatile + * because they are only visible to other threads upon safe + * publication. */ - // preliminaries + volatile Object result; // Either the result or boxed AltResult + volatile Completion stack; // Top of Treiber stack of dependent actions + + final boolean internalComplete(Object r) { // CAS from null to r + return UNSAFE.compareAndSwapObject(this, RESULT, null, r); + } + + final boolean casStack(Completion cmp, Completion val) { + return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); + } + + /** Returns true if successfully pushed c onto stack. */ + final boolean tryPushStack(Completion c) { + Completion h = stack; + lazySetNext(c, h); + return UNSAFE.compareAndSwapObject(this, STACK, h, c); + } + + /** Unconditionally pushes c onto stack, retrying if necessary. */ + final void pushStack(Completion c) { + do {} while (!tryPushStack(c)); + } + + /* ------------- Encoding and decoding outcomes -------------- */ + + static final class AltResult { // See above + final Throwable ex; // null only for NIL + AltResult(Throwable x) { this.ex = x; } + } - static final class AltResult { - final Throwable ex; // null only for NIL - AltResult(Throwable ex) { this.ex = ex; } + /** The encoding of the null value. */ + static final AltResult NIL = new AltResult(null); + + /** Completes with the null value, unless already completed. */ + final boolean completeNull() { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + NIL); + } + + /** Returns the encoding of the given non-exceptional value. */ + final Object encodeValue(T t) { + return (t == null) ? NIL : t; + } + + /** Completes with a non-exceptional result, unless already completed. */ + final boolean completeValue(T t) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + (t == null) ? NIL : t); + } + + /** + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. + */ + static AltResult encodeThrowable(Throwable x) { + return new AltResult((x instanceof CompletionException) ? x : + new CompletionException(x)); + } + + /** Completes with an exceptional result, unless already completed. */ + final boolean completeThrowable(Throwable x) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x)); } - static final AltResult NIL = new AltResult(null); + /** + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. May + * return the given Object r (which must have been the result of a + * source future) if it is equivalent, i.e. if this is a simple + * relay of an existing CompletionException. + */ + static Object encodeThrowable(Throwable x, Object r) { + if (!(x instanceof CompletionException)) + x = new CompletionException(x); + else if (r instanceof AltResult && x == ((AltResult)r).ex) + return r; + return new AltResult(x); + } - // Fields + /** + * Completes with the given (non-null) exceptional result as a + * wrapped CompletionException unless it is one already, unless + * already completed. May complete with the given Object r + * (which must have been the result of a source future) if it is + * equivalent, i.e. if this is a simple propagation of an + * existing CompletionException. + */ + final boolean completeThrowable(Throwable x, Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x, r)); + } + + /** + * Returns the encoding of the given arguments: if the exception + * is non-null, encodes as AltResult. Otherwise uses the given + * value, boxed as NIL if null. + */ + Object encodeOutcome(T t, Throwable x) { + return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); + } - volatile Object result; // Either the result or boxed AltResult - volatile WaitNode waiters; // Treiber stack of threads blocked on get() - volatile CompletionNode completions; // list (Treiber stack) of completions + /** + * Returns the encoding of a copied outcome; if exceptional, + * rewraps as a CompletionException, else returns argument. + */ + static Object encodeRelay(Object r) { + Throwable x; + return (((r instanceof AltResult) && + (x = ((AltResult)r).ex) != null && + !(x instanceof CompletionException)) ? + new AltResult(new CompletionException(x)) : r); + } + + /** + * Completes with r or a copy of r, unless already completed. + * If exceptional, r is first coerced to a CompletionException. + */ + final boolean completeRelay(Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeRelay(r)); + } - // Basic utilities for triggering and processing completions + /** + * Reports result using Future.get conventions. + */ + private static T reportGet(Object r) + throws InterruptedException, ExecutionException { + if (r == null) // by convention below, null means interrupted + throw new InterruptedException(); + if (r instanceof AltResult) { + Throwable x, cause; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if ((x instanceof CompletionException) && + (cause = x.getCause()) != null) + x = cause; + throw new ExecutionException(x); + } + @SuppressWarnings("unchecked") T t = (T) r; + return t; + } /** - * Removes and signals all waiting threads and runs all completions. + * Decodes outcome to return result or throw unchecked exception. + */ + private static T reportJoin(Object r) { + if (r instanceof AltResult) { + Throwable x; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if (x instanceof CompletionException) + throw (CompletionException)x; + throw new CompletionException(x); + } + @SuppressWarnings("unchecked") T t = (T) r; + return t; + } + + /* ------------- Async task preliminaries -------------- */ + + /** + * A marker interface identifying asynchronous tasks produced by + * {@code async} methods. This may be useful for monitoring, + * debugging, and tracking asynchronous activities. + * + * @since 1.8 + */ + public static interface AsynchronousCompletionTask { + } + + private static final boolean useCommonPool = + (ForkJoinPool.getCommonPoolParallelism() > 1); + + /** + * Default executor -- ForkJoinPool.commonPool() unless it cannot + * support parallelism. + */ + private static final Executor asyncPool = useCommonPool ? + ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); + + /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ + static final class ThreadPerTaskExecutor implements Executor { + public void execute(Runnable r) { new Thread(r).start(); } + } + + /** + * Null-checks user executor argument, and translates uses of + * commonPool to asyncPool in case parallelism disabled. + */ + static Executor screenExecutor(Executor e) { + if (!useCommonPool && e == ForkJoinPool.commonPool()) + return asyncPool; + if (e == null) throw new NullPointerException(); + return e; + } + + // Modes for Completion.tryFire. Signedness matters. + static final int SYNC = 0; + static final int ASYNC = 1; + static final int NESTED = -1; + + /* ------------- Base Completion classes and operations -------------- */ + + @SuppressWarnings("serial") + abstract static class Completion extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + volatile Completion next; // Treiber stack link + + /** + * Performs completion action if triggered, returning a + * dependent that may need propagation, if one exists. + * + * @param mode SYNC, ASYNC, or NESTED + */ + abstract CompletableFuture tryFire(int mode); + + /** Returns true if possibly still triggerable. Used by cleanStack. */ + abstract boolean isLive(); + + public final void run() { tryFire(ASYNC); } + public final boolean exec() { tryFire(ASYNC); return true; } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + } + + static void lazySetNext(Completion c, Completion next) { + UNSAFE.putOrderedObject(c, NEXT, next); + } + + /** + * Pops and tries to trigger all reachable dependents. Call only + * when known to be done. */ final void postComplete() { - WaitNode q; Thread t; - while ((q = waiters) != null) { - if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && - (t = q.thread) != null) { - q.thread = null; - LockSupport.unpark(t); + /* + * On each step, variable f holds current dependents to pop + * and run. It is extended along only one path at a time, + * pushing others to avoid unbounded recursion. + */ + CompletableFuture f = this; Completion h; + while ((h = f.stack) != null || + (f != this && (h = (f = this).stack) != null)) { + CompletableFuture d; Completion t; + if (f.casStack(h, t = h.next)) { + if (t != null) { + if (f != this) { + pushStack(h); + continue; + } + h.next = null; // detach + } + f = (d = h.tryFire(NESTED)) == null ? this : d; } } + } - CompletionNode h; Completion c; - while ((h = completions) != null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && - (c = h.completion) != null) - c.run(); + /** Traverses stack and unlinks dead Completions. */ + final void cleanStack() { + for (Completion p = null, q = stack; q != null;) { + Completion s = q.next; + if (q.isLive()) { + p = q; + q = s; + } + else if (p == null) { + casStack(q, s); + q = stack; + } + else { + p.next = s; + if (p.isLive()) + q = s; + else { + p = null; // restart + q = stack; + } + } + } + } + + /* ------------- One-input Completions -------------- */ + + /** A Completion with a source, dependent, and executor. */ + @SuppressWarnings("serial") + abstract static class UniCompletion extends Completion { + Executor executor; // executor to use (null if none) + CompletableFuture dep; // the dependent to complete + CompletableFuture src; // source for action + + UniCompletion(Executor executor, CompletableFuture dep, + CompletableFuture src) { + this.executor = executor; this.dep = dep; this.src = src; + } + + /** + * Returns true if action can be run. Call only when known to + * be triggerable. Uses FJ tag bit to ensure that only one + * thread claims ownership. If async, starts as task -- a + * later call to tryFire will run action. + */ + final boolean claim() { + Executor e = executor; + if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { + if (e == null) + return true; + executor = null; // disable + e.execute(this); + } + return false; + } + + final boolean isLive() { return dep != null; } + } + + /** Pushes the given completion (if it exists) unless done. */ + final void push(UniCompletion c) { + if (c != null) { + while (result == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure } } /** - * Triggers completion with the encoding of the given arguments: - * if the exception is non-null, encodes it as a wrapped - * CompletionException unless it is one already. Otherwise uses - * the given result, boxed as NIL if null. + * Post-processing by dependent after successful UniCompletion + * tryFire. Tries to clean stack of source a, and then either runs + * postComplete or returns this to caller, depending on mode. */ - final void internalComplete(T v, Throwable ex) { - if (result == null) - UNSAFE.compareAndSwapObject - (this, RESULT, null, - (ex == null) ? (v == null) ? NIL : v : - new AltResult((ex instanceof CompletionException) ? ex : - new CompletionException(ex))); - postComplete(); // help out even if not triggered + final CompletableFuture postFire(CompletableFuture a, int mode) { + if (a != null && a.stack != null) { + if (mode < 0 || a.result == null) + a.cleanStack(); + else + a.postComplete(); + } + if (result != null && stack != null) { + if (mode < 0) + return this; + else + postComplete(); + } + return null; + } + + @SuppressWarnings("serial") + static final class UniApply extends UniCompletion { + Function fn; + UniApply(Executor executor, CompletableFuture dep, + CompletableFuture src, + Function fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniApply(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniApply(CompletableFuture a, + Function f, + UniApply c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + completeValue(f.apply(s)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniApplyStage( + Executor e, Function f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniApply(this, f, null)) { + UniApply c = new UniApply(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniAccept extends UniCompletion { + Consumer fn; + UniAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, Consumer fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniAccept(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniAccept(CompletableFuture a, + Consumer f, UniAccept c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + f.accept(s); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniAcceptStage(Executor e, + Consumer f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniAccept(this, f, null)) { + UniAccept c = new UniAccept(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniRun extends UniCompletion { + Runnable fn; + UniRun(Executor executor, CompletableFuture dep, + CompletableFuture src, Runnable fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniRun(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniRun(CompletableFuture a, Runnable f, UniRun c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else + try { + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniRun(this, f, null)) { + UniRun c = new UniRun(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniWhenComplete extends UniCompletion { + BiConsumer fn; + UniWhenComplete(Executor executor, CompletableFuture dep, + CompletableFuture src, + BiConsumer fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniWhenComplete(CompletableFuture a, + BiConsumer f, + UniWhenComplete c) { + Object r; T t; Throwable x = null; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + x = ((AltResult)r).ex; + t = null; + } else { + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + f.accept(t, x); + if (x == null) { + internalComplete(r); + return true; + } + } catch (Throwable ex) { + if (x == null) + x = ex; + } + completeThrowable(x, r); + } + return true; + } + + private CompletableFuture uniWhenCompleteStage( + Executor e, BiConsumer f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniWhenComplete(this, f, null)) { + UniWhenComplete c = new UniWhenComplete(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniHandle extends UniCompletion { + BiFunction fn; + UniHandle(Executor executor, CompletableFuture dep, + CompletableFuture src, + BiFunction fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniHandle(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } } - /** - * If triggered, helps release and/or process completions. - */ - final void helpPostComplete() { - if (result != null) - postComplete(); + final boolean uniHandle(CompletableFuture a, + BiFunction f, + UniHandle c) { + Object r; S s; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + x = ((AltResult)r).ex; + s = null; + } else { + x = null; + @SuppressWarnings("unchecked") S ss = (S) r; + s = ss; + } + completeValue(f.apply(s, x)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniHandleStage( + Executor e, BiFunction f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniHandle(this, f, null)) { + UniHandle c = new UniHandle(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniExceptionally extends UniCompletion { + Function fn; + UniExceptionally(CompletableFuture dep, CompletableFuture src, + Function fn) { + super(null, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { // never ASYNC + // assert mode != ASYNC; + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniExceptionally(CompletableFuture a, + Function f, + UniExceptionally c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { + if (c != null && !c.claim()) + return false; + completeValue(f.apply(x)); + } else + internalComplete(r); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniExceptionallyStage( + Function f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (!d.uniExceptionally(this, f, null)) { + UniExceptionally c = new UniExceptionally(d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniRelay extends UniCompletion { // for Compose + UniRelay(CompletableFuture dep, CompletableFuture src) { + super(null, dep, src); + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || !d.uniRelay(a = src)) + return null; + src = null; dep = null; + return d.postFire(a, mode); + } + } + + final boolean uniRelay(CompletableFuture a) { + Object r; + if (a == null || (r = a.result) == null) + return false; + if (result == null) // no need to claim + completeRelay(r); + return true; + } + + @SuppressWarnings("serial") + static final class UniCompose extends UniCompletion { + Function> fn; + UniCompose(Executor executor, CompletableFuture dep, + CompletableFuture src, + Function> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniCompose(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniCompose( + CompletableFuture a, + Function> f, + UniCompose c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + CompletableFuture g = f.apply(s).toCompletableFuture(); + if (g.result == null || !uniRelay(g)) { + UniRelay copy = new UniRelay(this, g); + g.push(copy); + copy.tryFire(SYNC); + if (result == null) + return false; + } + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniComposeStage( + Executor e, Function> f) { + if (f == null) throw new NullPointerException(); + Object r; Throwable x; + if (e == null && (r = result) != null) { + // try to return function result directly + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + return new CompletableFuture(encodeThrowable(x, r)); + } + r = null; + } + try { + @SuppressWarnings("unchecked") T t = (T) r; + return f.apply(t).toCompletableFuture(); + } catch (Throwable ex) { + return new CompletableFuture(encodeThrowable(ex)); + } + } + CompletableFuture d = new CompletableFuture(); + UniCompose c = new UniCompose(e, d, this, f); + push(c); + c.tryFire(SYNC); + return d; + } + + /* ------------- Two-input Completions -------------- */ + + /** A Completion for an action with two sources */ + @SuppressWarnings("serial") + abstract static class BiCompletion extends UniCompletion { + CompletableFuture snd; // second source for action + BiCompletion(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd) { + super(executor, dep, src); this.snd = snd; + } + } + + /** A Completion delegating to a BiCompletion */ + @SuppressWarnings("serial") + static final class CoCompletion extends Completion { + BiCompletion base; + CoCompletion(BiCompletion base) { this.base = base; } + final CompletableFuture tryFire(int mode) { + BiCompletion c; CompletableFuture d; + if ((c = base) == null || (d = c.tryFire(mode)) == null) + return null; + base = null; // detach + return d; + } + final boolean isLive() { + BiCompletion c; + return (c = base) != null && c.dep != null; + } + } + + /** Pushes completion to this and b unless both done. */ + final void bipush(CompletableFuture b, BiCompletion c) { + if (c != null) { + Object r; + while ((r = result) == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure + if (b != null && b != this && b.result == null) { + Completion q = (r != null) ? c : new CoCompletion(c); + while (b.result == null && !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure + } + } + } + + /** Post-processing after successful BiCompletion tryFire. */ + final CompletableFuture postFire(CompletableFuture a, + CompletableFuture b, int mode) { + if (b != null && b.stack != null) { // clean second source + if (mode < 0 || b.result == null) + b.cleanStack(); + else + b.postComplete(); + } + return postFire(a, mode); + } + + @SuppressWarnings("serial") + static final class BiApply extends BiCompletion { + BiFunction fn; + BiApply(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd, + BiFunction fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } } - /* ------------- waiting for completions -------------- */ + final boolean biApply(CompletableFuture a, + CompletableFuture b, + BiFunction f, + BiApply c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; + } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + completeValue(f.apply(rr, ss)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture biApplyStage( + Executor e, CompletionStage o, + BiFunction f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biApply(this, b, f, null)) { + BiApply c = new BiApply(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiAccept extends BiCompletion { + BiConsumer fn; + BiAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd, + BiConsumer fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean biAccept(CompletableFuture a, + CompletableFuture b, + BiConsumer f, + BiAccept c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; + } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + f.accept(rr, ss); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture biAcceptStage( + Executor e, CompletionStage o, + BiConsumer f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biAccept(this, b, f, null)) { + BiAccept c = new BiAccept(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } - /** Number of processors, for spin control */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); + @SuppressWarnings("serial") + static final class BiRun extends BiCompletion { + Runnable fn; + BiRun(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean biRun(CompletableFuture a, CompletableFuture b, + Runnable f, BiRun c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); + else + try { + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture biRunStage(Executor e, CompletionStage o, + Runnable f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biRun(this, b, f, null)) { + BiRun c = new BiRun<>(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiRelay extends BiCompletion { // for And + BiRelay(CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd) { + super(null, dep, src, snd); + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || !d.biRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + boolean biRelay(CompletableFuture a, CompletableFuture b) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); + else + completeNull(); + } + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture andTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture d = new CompletableFuture(); + if (lo > hi) // empty + d.result = NIL; + else { + CompletableFuture a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + andTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + andTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.biRelay(a, b)) { + BiRelay c = new BiRelay<>(d, a, b); + a.bipush(b, c); + c.tryFire(SYNC); + } + } + return d; + } + + /* ------------- Projected (Ored) BiCompletions -------------- */ + + /** Pushes completion to this and b unless either done. */ + final void orpush(CompletableFuture b, BiCompletion c) { + if (c != null) { + while ((b == null || b.result == null) && result == null) { + if (tryPushStack(c)) { + if (b != null && b != this && b.result == null) { + Completion q = new CoCompletion(c); + while (result == null && b.result == null && + !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure + } + break; + } + lazySetNext(c, null); // clear on failure + } + } + } + + @SuppressWarnings("serial") + static final class OrApply extends BiCompletion { + Function fn; + OrApply(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Function fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } - /** - * Heuristic spin value for waitingGet() before blocking on - * multiprocessors - */ - static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; + final boolean orApply(CompletableFuture a, + CompletableFuture b, + Function f, + OrApply c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") R rr = (R) r; + completeValue(f.apply(rr)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture orApplyStage( + Executor e, CompletionStage o, + Function f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orApply(this, b, f, null)) { + OrApply c = new OrApply(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrAccept extends BiCompletion { + Consumer fn; + OrAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Consumer fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean orAccept(CompletableFuture a, + CompletableFuture b, + Consumer f, + OrAccept c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") R rr = (R) r; + f.accept(rr); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture orAcceptStage( + Executor e, CompletionStage o, Consumer f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orAccept(this, b, f, null)) { + OrAccept c = new OrAccept(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrRun extends BiCompletion { + Runnable fn; + OrRun(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRun(CompletableFuture a, CompletableFuture b, + Runnable f, OrRun c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else { + f.run(); + completeNull(); + } + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture orRunStage(Executor e, CompletionStage o, + Runnable f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orRun(this, b, f, null)) { + OrRun c = new OrRun<>(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrRelay extends BiCompletion { // for Or + OrRelay(CompletableFuture dep, CompletableFuture src, + CompletableFuture snd) { + super(null, dep, src, snd); + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || !d.orRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRelay(CompletableFuture a, CompletableFuture b) { + Object r; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null)) + return false; + if (result == null) + completeRelay(r); + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture orTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture d = new CompletableFuture(); + if (lo <= hi) { + CompletableFuture a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + orTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + orTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.orRelay(a, b)) { + OrRelay c = new OrRelay<>(d, a, b); + a.orpush(b, c); + c.tryFire(SYNC); + } + } + return d; + } + + /* ------------- Zero-input Async forms -------------- */ + + @SuppressWarnings("serial") + static final class AsyncSupply extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + CompletableFuture dep; Supplier fn; + AsyncSupply(CompletableFuture dep, Supplier fn) { + this.dep = dep; this.fn = fn; + } + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture d; Supplier f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { + try { + d.completeValue(f.get()); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } + d.postComplete(); + } + } + } + + static CompletableFuture asyncSupplyStage(Executor e, + Supplier f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + e.execute(new AsyncSupply(d, f)); + return d; + } + + @SuppressWarnings("serial") + static final class AsyncRun extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + CompletableFuture dep; Runnable fn; + AsyncRun(CompletableFuture dep, Runnable fn) { + this.dep = dep; this.fn = fn; + } + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture d; Runnable f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { + try { + f.run(); + d.completeNull(); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } + d.postComplete(); + } + } + } + + static CompletableFuture asyncRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + e.execute(new AsyncRun(d, f)); + return d; + } + + /* ------------- Signallers -------------- */ /** - * Linked nodes to record waiting threads in a Treiber stack. See - * other classes such as Phaser and SynchronousQueue for more - * detailed explanation. This class implements ManagedBlocker to - * avoid starvation when blocking actions pile up in - * ForkJoinPools. + * Completion for recording and releasing a waiting thread. This + * class implements ManagedBlocker to avoid starvation when + * blocking actions pile up in ForkJoinPools. */ - static final class WaitNode implements ForkJoinPool.ManagedBlocker { - long nanos; // wait time if timed - final long deadline; // non-zero if timed + @SuppressWarnings("serial") + static final class Signaller extends Completion + implements ForkJoinPool.ManagedBlocker { + long nanos; // wait time if timed + final long deadline; // non-zero if timed volatile int interruptControl; // > 0: interruptible, < 0: interrupted volatile Thread thread; - volatile WaitNode next; - WaitNode(boolean interruptible, long nanos, long deadline) { + + Signaller(boolean interruptible, long nanos, long deadline) { this.thread = Thread.currentThread(); this.interruptControl = interruptible ? 1 : 0; this.nanos = nanos; this.deadline = deadline; } + final CompletableFuture tryFire(int ignore) { + Thread w; // no need to atomically claim + if ((w = thread) != null) { + thread = null; + LockSupport.unpark(w); + } + return null; + } public boolean isReleasable() { if (thread == null) return true; @@ -273,6 +1687,7 @@ LockSupport.parkNanos(this, nanos); return isReleasable(); } + final boolean isLive() { return thread != null; } } /** @@ -280,1832 +1695,89 @@ * interrupted. */ private Object waitingGet(boolean interruptible) { - WaitNode q = null; + Signaller q = null; boolean queued = false; - int spins = SPINS; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { // suppress unpark - q.thread = null; - if (q.interruptControl < 0) { - if (interruptible) { - removeWaiter(q); - return null; - } - Thread.currentThread().interrupt(); - } - } - postComplete(); // help release others - return r; - } + int spins = -1; + Object r; + while ((r = result) == null) { + if (spins < 0) + spins = (Runtime.getRuntime().availableProcessors() > 1) ? + 1 << 8 : 0; // Use brief spin-wait on multiprocessors else if (spins > 0) { - int rnd = ThreadLocalRandom.nextSecondarySeed(); - if (rnd == 0) - rnd = ThreadLocalRandom.current().nextInt(); - if (rnd >= 0) + if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) - q = new WaitNode(interruptible, 0L, 0L); + q = new Signaller(interruptible, 0L, 0L); else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); + queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) { - removeWaiter(q); + q.thread = null; + cleanStack(); return null; } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { - q.interruptControl = -1; - } - } - } - } - - /** - * Awaits completion or aborts on interrupt or timeout. - * - * @param nanos time to wait - * @return raw result - */ - private Object timedAwaitDone(long nanos) - throws InterruptedException, TimeoutException { - WaitNode q = null; - boolean queued = false; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { - q.thread = null; - if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - } - postComplete(); - return r; - } - else if (q == null) { - if (nanos <= 0L) - throw new TimeoutException(); - long d = System.nanoTime() + nanos; - q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 - } - else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); - else if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - else if (q.nanos <= 0L) { - if (result == null) { - removeWaiter(q); - throw new TimeoutException(); - } - } - else if (q.thread != null && result == null) { - try { - ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { + } catch (InterruptedException ie) { q.interruptControl = -1; } } } - } - - /** - * Tries to unlink a timed-out or interrupted wait node to avoid - * accumulating garbage. Internal nodes are simply unspliced - * without CAS since it is harmless if they are traversed anyway - * by releasers. To avoid effects of unsplicing from already - * removed nodes, the list is retraversed in case of an apparent - * race. This is slow when there are a lot of nodes, but we don't - * expect lists to be long enough to outweigh higher-overhead - * schemes. - */ - private void removeWaiter(WaitNode node) { - if (node != null) { - node.thread = null; - retry: - for (;;) { // restart on removeWaiter race - for (WaitNode pred = null, q = waiters, s; q != null; q = s) { - s = q.next; - if (q.thread != null) - pred = q; - else if (pred != null) { - pred.next = s; - if (pred.thread == null) // check for race - continue retry; - } - else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) - continue retry; - } - break; + if (q != null) { + q.thread = null; + if (q.interruptControl < 0) { + if (interruptible) + r = null; // report interruption + else + Thread.currentThread().interrupt(); } } - } - - /* ------------- Async tasks -------------- */ - - /** - * A marker interface identifying asynchronous tasks produced by - * {@code async} methods. This may be useful for monitoring, - * debugging, and tracking asynchronous activities. - * - * @since 1.8 - */ - public static interface AsynchronousCompletionTask { - } - - /** Base class can act as either FJ or plain Runnable */ - @SuppressWarnings("serial") - abstract static class Async extends ForkJoinTask - implements Runnable, AsynchronousCompletionTask { - public final Void getRawResult() { return null; } - public final void setRawResult(Void v) { } - public final void run() { exec(); } + postComplete(); + return r; } /** - * Starts the given async task using the given executor, unless - * the executor is ForkJoinPool.commonPool and it has been - * disabled, in which case starts a new thread. - */ - static void execAsync(Executor e, Async r) { - if (e == ForkJoinPool.commonPool() && - ForkJoinPool.getCommonPoolParallelism() <= 1) - new Thread(r).start(); - else - e.execute(r); - } - - static final class AsyncRun extends Async { - final Runnable fn; - final CompletableFuture dst; - AsyncRun(Runnable fn, CompletableFuture dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.run(); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncSupply extends Async { - final Supplier fn; - final CompletableFuture dst; - AsyncSupply(Supplier fn, CompletableFuture dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.get(); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncApply extends Async { - final T arg; - final Function fn; - final CompletableFuture dst; - AsyncApply(T arg, Function fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.apply(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncCombine extends Async { - final T arg1; - final U arg2; - final BiFunction fn; - final CompletableFuture dst; - AsyncCombine(T arg1, U arg2, - BiFunction fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; V v; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - v = fn.apply(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; - v = null; - } - d.internalComplete(v, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncAccept extends Async { - final T arg; - final Consumer fn; - final CompletableFuture dst; - AsyncAccept(T arg, Consumer fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncAcceptBoth extends Async { - final T arg1; - final U arg2; - final BiConsumer fn; - final CompletableFuture dst; - AsyncAcceptBoth(T arg1, U arg2, - BiConsumer fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncCompose extends Async { - final T arg; - final Function> fn; - final CompletableFuture dst; - AsyncCompose(T arg, - Function> fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d, fr; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - CompletionStage cs = fn.apply(arg); - fr = (cs == null) ? null : cs.toCompletableFuture(); - ex = (fr == null) ? new NullPointerException() : null; - } catch (Throwable rex) { - ex = rex; - fr = null; - } - if (ex != null) - u = null; - else { - Object r = fr.result; - if (r == null) - r = fr.waitingGet(false); - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U ur = (U) r; - u = ur; - } - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncWhenComplete extends Async { - final T arg1; - final Throwable arg2; - final BiConsumer fn; - final CompletableFuture dst; - AsyncWhenComplete(T arg1, Throwable arg2, - BiConsumer fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; - if ((d = this.dst) != null && d.result == null) { - Throwable ex = arg2; - try { - fn.accept(arg1, ex); - } catch (Throwable rex) { - if (ex == null) - ex = rex; - } - d.internalComplete(arg1, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - /* ------------- Completions -------------- */ - - /** - * Simple linked list nodes to record completions, used in - * basically the same way as WaitNodes. (We separate nodes from - * the Completions themselves mainly because for the And and Or - * methods, the same Completion object resides in two lists.) + * Returns raw result after waiting, or null if interrupted, or + * throws TimeoutException on timeout. */ - static final class CompletionNode { - final Completion completion; - volatile CompletionNode next; - CompletionNode(Completion completion) { this.completion = completion; } - } - - // Opportunistically subclass AtomicInteger to use compareAndSet to claim. - @SuppressWarnings("serial") - abstract static class Completion extends AtomicInteger implements Runnable { - } - - static final class ThenApply extends Completion { - final CompletableFuture src; - final Function fn; - final CompletableFuture dst; - final Executor executor; - ThenApply(CompletableFuture src, - Function fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Function fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAccept extends Completion { - final CompletableFuture src; - final Consumer fn; - final CompletableFuture dst; - final Executor executor; - ThenAccept(CompletableFuture src, - Consumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Consumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenRun extends Completion { - final CompletableFuture src; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - ThenRun(CompletableFuture src, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Runnable fn; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCombine extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final BiFunction fn; - final CompletableFuture dst; - final Executor executor; - ThenCombine(CompletableFuture src, - CompletableFuture snd, - BiFunction fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final BiFunction fn; - final CompletableFuture dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAcceptBoth extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final BiConsumer fn; - final CompletableFuture dst; - final Executor executor; - ThenAcceptBoth(CompletableFuture src, - CompletableFuture snd, - BiConsumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final BiConsumer fn; - final CompletableFuture dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterBoth extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - RunAfterBoth(CompletableFuture src, - CompletableFuture snd, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Runnable fn; - final CompletableFuture dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AndCompletion extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final CompletableFuture dst; - AndCompletion(CompletableFuture src, - CompletableFuture snd, - CompletableFuture dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final CompletableFuture dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); + private Object timedGet(long nanos) throws TimeoutException { + if (Thread.interrupted()) + return null; + if (nanos <= 0L) + throw new TimeoutException(); + long d = System.nanoTime() + nanos; + Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 + boolean queued = false; + Object r; + // We intentionally don't spin here (as waitingGet does) because + // the call to nanoTime() above acts much like a spin. + while ((r = result) == null) { + if (!queued) + queued = tryPushStack(q); + else if (q.interruptControl < 0 || q.nanos <= 0L) { + q.thread = null; + cleanStack(); + if (q.interruptControl < 0) + return null; + throw new TimeoutException(); } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ApplyToEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Function fn; - final CompletableFuture dst; - final Executor executor; - ApplyToEither(CompletableFuture src, - CompletableFuture snd, - Function fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Function fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AcceptEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Consumer fn; - final CompletableFuture dst; - final Executor executor; - AcceptEither(CompletableFuture src, - CompletableFuture snd, - Consumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Consumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - RunAfterEither(CompletableFuture src, - CompletableFuture snd, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Runnable fn; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class OrCompletion extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final CompletableFuture dst; - OrCompletion(CompletableFuture src, - CompletableFuture snd, - CompletableFuture dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final CompletableFuture dst; - Object r, t; Throwable ex; - if ((dst = this.dst) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ExceptionCompletion extends Completion { - final CompletableFuture src; - final Function fn; - final CompletableFuture dst; - ExceptionCompletion(CompletableFuture src, - Function fn, - CompletableFuture dst) { - this.src = src; this.fn = fn; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final Function fn; - final CompletableFuture dst; - Object r; T t = null; Throwable ex, dx = null; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if ((r instanceof AltResult) && - (ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class WhenCompleteCompletion extends Completion { - final CompletableFuture src; - final BiConsumer fn; - final CompletableFuture dst; - final Executor executor; - WhenCompleteCompletion(CompletableFuture src, - BiConsumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final BiConsumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - Throwable dx = null; + else if (q.thread != null && result == null) { try { - if (e != null) - execAsync(e, new AsyncWhenComplete(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCopy extends Completion { - final CompletableFuture src; - final CompletableFuture dst; - ThenCopy(CompletableFuture src, - CompletableFuture dst) { - this.src = src; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - // version of ThenCopy for CompletableFuture dst - static final class ThenPropagate extends Completion { - final CompletableFuture src; - final CompletableFuture dst; - ThenPropagate(CompletableFuture src, - CompletableFuture dst) { - this.src = src; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class HandleCompletion extends Completion { - final CompletableFuture src; - final BiFunction fn; - final CompletableFuture dst; - final Executor executor; - HandleCompletion(CompletableFuture src, - BiFunction fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final BiFunction fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine(t, ex, fn, dst)); - else - u = fn.apply(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(u, dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCompose extends Completion { - final CompletableFuture src; - final Function> fn; - final CompletableFuture dst; - final Executor executor; - ThenCompose(CompletableFuture src, - Function> fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Function> fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; Executor e; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - CompletableFuture c = null; - U u = null; - boolean complete = false; - if (ex == null) { - if ((e = executor) != null) - execAsync(e, new AsyncCompose(t, fn, dst)); - else { - try { - CompletionStage cs = fn.apply(t); - c = (cs == null) ? null : cs.toCompletableFuture(); - if (c == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (c != null) { - ThenCopy d = null; - Object s; - if ((s = c.result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenCopy(c, dst)); - while ((s = c.result) == null) { - if (UNSAFE.compareAndSwapObject - (c, COMPLETIONS, p.next = c.completions, p)) - break; - } - } - if (s != null && (d == null || d.compareAndSet(0, 1))) { - complete = true; - if (s instanceof AltResult) { - ex = ((AltResult)s).ex; // no rewrap - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - } - } - if (complete || ex != null) - dst.internalComplete(u, ex); - if (c != null) - c.helpPostComplete(); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - // Implementations of stage methods with (plain, async, Executor) forms - - private CompletableFuture doThenApply - (Function fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenApply d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenApply(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture doThenAccept(Consumer fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenAccept d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenAccept(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture doThenRun(Runnable action, - Executor e) { - if (action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenRun d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenRun(this, action, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture doThenCombine - (CompletableFuture other, - BiFunction fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenCombine d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenCombine(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ie) { + q.interruptControl = -1; } } } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doThenAcceptBoth - (CompletableFuture other, - BiConsumer fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenAcceptBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenAcceptBoth(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doRunAfterBoth(CompletableFuture other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - RunAfterBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new RunAfterBoth(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doApplyToEither - (CompletableFuture other, - Function fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ApplyToEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new ApplyToEither(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + if (q.interruptControl < 0) + r = null; + q.thread = null; + postComplete(); + return r; } - private CompletableFuture doAcceptEither - (CompletableFuture other, - Consumer fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - AcceptEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new AcceptEither(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doRunAfterEither - (CompletableFuture other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - RunAfterEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new RunAfterEither(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doThenCompose - (Function> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = null; - ThenCompose d = null; - Object r; - if ((r = result) == null) { - dst = new CompletableFuture(); - CompletionNode p = new CompletionNode - (d = new ThenCompose(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - if (e != null) { - if (dst == null) - dst = new CompletableFuture(); - execAsync(e, new AsyncCompose(t, fn, dst)); - } - else { - try { - CompletionStage cs = fn.apply(t); - if (cs == null || - (dst = cs.toCompletableFuture()) == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (dst == null) - dst = new CompletableFuture(); - if (ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - dst.helpPostComplete(); - return dst; - } - - private CompletableFuture doWhenComplete - (BiConsumer fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - WhenCompleteCompletion d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new WhenCompleteCompletion - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncWhenComplete(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture doHandle - (BiFunction fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - HandleCompletion d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new HandleCompletion - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine(t, ex, fn, dst)); - else { - u = fn.apply(t, ex); - dx = null; - } - } catch (Throwable rex) { - dx = rex; - u = null; - } - if (e == null || dx != null) - dst.internalComplete(u, dx); - } - helpPostComplete(); - return dst; - } - - - // public methods + /* ------------- public methods -------------- */ /** * Creates a new incomplete CompletableFuture. @@ -2114,6 +1786,13 @@ } /** + * Creates a new complete CompletableFuture with given encoded result. + */ + private CompletableFuture(Object r) { + this.result = r; + } + + /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with * the value obtained by calling the given Supplier. @@ -2124,10 +1803,7 @@ * @return the new CompletableFuture */ public static CompletableFuture supplyAsync(Supplier supplier) { - if (supplier == null) throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(ForkJoinPool.commonPool(), new AsyncSupply(supplier, f)); - return f; + return asyncSupplyStage(asyncPool, supplier); } /** @@ -2143,11 +1819,7 @@ */ public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) { - if (executor == null || supplier == null) - throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(executor, new AsyncSupply(supplier, f)); - return f; + return asyncSupplyStage(screenExecutor(executor), supplier); } /** @@ -2160,10 +1832,7 @@ * @return the new CompletableFuture */ public static CompletableFuture runAsync(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f)); - return f; + return asyncRunStage(asyncPool, runnable); } /** @@ -2178,11 +1847,7 @@ */ public static CompletableFuture runAsync(Runnable runnable, Executor executor) { - if (executor == null || runnable == null) - throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(executor, new AsyncRun(runnable, f)); - return f; + return asyncRunStage(screenExecutor(executor), runnable); } /** @@ -2194,9 +1859,7 @@ * @return the completed CompletableFuture */ public static CompletableFuture completedFuture(U value) { - CompletableFuture f = new CompletableFuture(); - f.result = (value == null) ? NIL : value; - return f; + return new CompletableFuture((value == null) ? NIL : value); } /** @@ -2220,21 +1883,8 @@ * while waiting */ public T get() throws InterruptedException, ExecutionException { - Object r; Throwable ex, cause; - if ((r = result) == null && (r = waitingGet(true)) == null) - throw new InterruptedException(); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + Object r; + return reportGet((r = result) == null ? waitingGet(true) : r); } /** @@ -2252,24 +1902,9 @@ */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Object r; Throwable ex, cause; + Object r; long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - throw new InterruptedException(); - if ((r = result) == null) - r = timedAwaitDone(nanos); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + return reportGet((r = result) == null ? timedGet(nanos) : r); } /** @@ -2287,20 +1922,8 @@ * exceptionally or a completion computation threw an exception */ public T join() { - Object r; Throwable ex; - if ((r = result) == null) - r = waitingGet(false); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return reportJoin((r = result) == null ? waitingGet(false) : r); } /** @@ -2314,20 +1937,8 @@ * exceptionally or a completion computation threw an exception */ public T getNow(T valueIfAbsent) { - Object r; Throwable ex; - if ((r = result) == null) - return valueIfAbsent; - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return ((r = result) == null) ? valueIfAbsent : reportJoin(r); } /** @@ -2339,9 +1950,7 @@ * to transition to a completed state, else {@code false} */ public boolean complete(T value) { - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, - value == null ? NIL : value); + boolean triggered = completeValue(value); postComplete(); return triggered; } @@ -2356,244 +1965,200 @@ */ public boolean completeExceptionally(Throwable ex) { if (ex == null) throw new NullPointerException(); - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); + boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; } - // CompletionStage methods - - public CompletableFuture thenApply - (Function fn) { - return doThenApply(fn, null); + public CompletableFuture thenApply( + Function fn) { + return uniApplyStage(null, fn); } - public CompletableFuture thenApplyAsync - (Function fn) { - return doThenApply(fn, ForkJoinPool.commonPool()); + public CompletableFuture thenApplyAsync( + Function fn) { + return uniApplyStage(asyncPool, fn); } - public CompletableFuture thenApplyAsync - (Function fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenApply(fn, executor); + public CompletableFuture thenApplyAsync( + Function fn, Executor executor) { + return uniApplyStage(screenExecutor(executor), fn); } - public CompletableFuture thenAccept - (Consumer action) { - return doThenAccept(action, null); + public CompletableFuture thenAccept(Consumer action) { + return uniAcceptStage(null, action); } - public CompletableFuture thenAcceptAsync - (Consumer action) { - return doThenAccept(action, ForkJoinPool.commonPool()); + public CompletableFuture thenAcceptAsync(Consumer action) { + return uniAcceptStage(asyncPool, action); } - public CompletableFuture thenAcceptAsync - (Consumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAccept(action, executor); + public CompletableFuture thenAcceptAsync(Consumer action, + Executor executor) { + return uniAcceptStage(screenExecutor(executor), action); } - public CompletableFuture thenRun - (Runnable action) { - return doThenRun(action, null); + public CompletableFuture thenRun(Runnable action) { + return uniRunStage(null, action); } - public CompletableFuture thenRunAsync - (Runnable action) { - return doThenRun(action, ForkJoinPool.commonPool()); + public CompletableFuture thenRunAsync(Runnable action) { + return uniRunStage(asyncPool, action); } - public CompletableFuture thenRunAsync - (Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenRun(action, executor); + public CompletableFuture thenRunAsync(Runnable action, + Executor executor) { + return uniRunStage(screenExecutor(executor), action); } - public CompletableFuture thenCombine - (CompletionStage other, - BiFunction fn) { - return doThenCombine(other.toCompletableFuture(), fn, null); + public CompletableFuture thenCombine( + CompletionStage other, + BiFunction fn) { + return biApplyStage(null, other, fn); } - public CompletableFuture thenCombineAsync - (CompletionStage other, - BiFunction fn) { - return doThenCombine(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public CompletableFuture thenCombineAsync( + CompletionStage other, + BiFunction fn) { + return biApplyStage(asyncPool, other, fn); } - public CompletableFuture thenCombineAsync - (CompletionStage other, - BiFunction fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCombine(other.toCompletableFuture(), fn, executor); + public CompletableFuture thenCombineAsync( + CompletionStage other, + BiFunction fn, Executor executor) { + return biApplyStage(screenExecutor(executor), other, fn); } - public CompletableFuture thenAcceptBoth - (CompletionStage other, - BiConsumer action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, null); + public CompletableFuture thenAcceptBoth( + CompletionStage other, + BiConsumer action) { + return biAcceptStage(null, other, action); } - public CompletableFuture thenAcceptBothAsync - (CompletionStage other, - BiConsumer action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, + BiConsumer action) { + return biAcceptStage(asyncPool, other, action); } - public CompletableFuture thenAcceptBothAsync - (CompletionStage other, - BiConsumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAcceptBoth(other.toCompletableFuture(), action, executor); - } - - public CompletableFuture runAfterBoth - (CompletionStage other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, null); + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, + BiConsumer action, Executor executor) { + return biAcceptStage(screenExecutor(executor), other, action); } - public CompletableFuture runAfterBothAsync - (CompletionStage other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture runAfterBoth(CompletionStage other, + Runnable action) { + return biRunStage(null, other, action); } - public CompletableFuture runAfterBothAsync - (CompletionStage other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterBoth(other.toCompletableFuture(), action, executor); + public CompletableFuture runAfterBothAsync(CompletionStage other, + Runnable action) { + return biRunStage(asyncPool, other, action); } - - public CompletableFuture applyToEither - (CompletionStage other, - Function fn) { - return doApplyToEither(other.toCompletableFuture(), fn, null); + public CompletableFuture runAfterBothAsync(CompletionStage other, + Runnable action, + Executor executor) { + return biRunStage(screenExecutor(executor), other, action); } - public CompletableFuture applyToEitherAsync - (CompletionStage other, - Function fn) { - return doApplyToEither(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public CompletableFuture applyToEither( + CompletionStage other, Function fn) { + return orApplyStage(null, other, fn); + } + + public CompletableFuture applyToEitherAsync( + CompletionStage other, Function fn) { + return orApplyStage(asyncPool, other, fn); } - public CompletableFuture applyToEitherAsync - (CompletionStage other, - Function fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doApplyToEither(other.toCompletableFuture(), fn, executor); - } - - public CompletableFuture acceptEither - (CompletionStage other, - Consumer action) { - return doAcceptEither(other.toCompletableFuture(), action, null); + public CompletableFuture applyToEitherAsync( + CompletionStage other, Function fn, + Executor executor) { + return orApplyStage(screenExecutor(executor), other, fn); } - public CompletableFuture acceptEitherAsync - (CompletionStage other, - Consumer action) { - return doAcceptEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture acceptEither( + CompletionStage other, Consumer action) { + return orAcceptStage(null, other, action); } - public CompletableFuture acceptEitherAsync - (CompletionStage other, - Consumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doAcceptEither(other.toCompletableFuture(), action, executor); + public CompletableFuture acceptEitherAsync( + CompletionStage other, Consumer action) { + return orAcceptStage(asyncPool, other, action); + } + + public CompletableFuture acceptEitherAsync( + CompletionStage other, Consumer action, + Executor executor) { + return orAcceptStage(screenExecutor(executor), other, action); } public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, null); + return orRunStage(null, other, action); } - public CompletableFuture runAfterEitherAsync - (CompletionStage other, - Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture runAfterEitherAsync(CompletionStage other, + Runnable action) { + return orRunStage(asyncPool, other, action); } - public CompletableFuture runAfterEitherAsync - (CompletionStage other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterEither(other.toCompletableFuture(), action, executor); + public CompletableFuture runAfterEitherAsync(CompletionStage other, + Runnable action, + Executor executor) { + return orRunStage(screenExecutor(executor), other, action); } - public CompletableFuture thenCompose - (Function> fn) { - return doThenCompose(fn, null); + public CompletableFuture thenCompose( + Function> fn) { + return uniComposeStage(null, fn); } - public CompletableFuture thenComposeAsync - (Function> fn) { - return doThenCompose(fn, ForkJoinPool.commonPool()); + public CompletableFuture thenComposeAsync( + Function> fn) { + return uniComposeStage(asyncPool, fn); } - public CompletableFuture thenComposeAsync - (Function> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCompose(fn, executor); + public CompletableFuture thenComposeAsync( + Function> fn, + Executor executor) { + return uniComposeStage(screenExecutor(executor), fn); } - public CompletableFuture whenComplete - (BiConsumer action) { - return doWhenComplete(action, null); + public CompletableFuture whenComplete( + BiConsumer action) { + return uniWhenCompleteStage(null, action); } - public CompletableFuture whenCompleteAsync - (BiConsumer action) { - return doWhenComplete(action, ForkJoinPool.commonPool()); + public CompletableFuture whenCompleteAsync( + BiConsumer action) { + return uniWhenCompleteStage(asyncPool, action); } - public CompletableFuture whenCompleteAsync - (BiConsumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doWhenComplete(action, executor); + public CompletableFuture whenCompleteAsync( + BiConsumer action, Executor executor) { + return uniWhenCompleteStage(screenExecutor(executor), action); } - public CompletableFuture handle - (BiFunction fn) { - return doHandle(fn, null); + public CompletableFuture handle( + BiFunction fn) { + return uniHandleStage(null, fn); } - public CompletableFuture handleAsync - (BiFunction fn) { - return doHandle(fn, ForkJoinPool.commonPool()); + public CompletableFuture handleAsync( + BiFunction fn) { + return uniHandleStage(asyncPool, fn); } - public CompletableFuture handleAsync - (BiFunction fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doHandle(fn, executor); + public CompletableFuture handleAsync( + BiFunction fn, Executor executor) { + return uniHandleStage(screenExecutor(executor), fn); } /** - * Returns this CompletableFuture + * Returns this CompletableFuture. * * @return this CompletableFuture */ @@ -2618,52 +2183,13 @@ * exceptionally * @return the new CompletableFuture */ - public CompletableFuture exceptionally - (Function fn) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ExceptionCompletion d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new ExceptionCompletion - (this, fn, dst)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t = null; Throwable ex, dx = null; - if (r instanceof AltResult) { - if ((ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } - helpPostComplete(); - return dst; + public CompletableFuture exceptionally( + Function fn) { + return uniExceptionallyStage(fn); } /* ------------- Arbitrary-arity constructions -------------- */ - /* - * The basic plan of attack is to recursively form binary - * completion trees of elements. This can be overkill for small - * sets, but scales nicely. The And/All vs Or/Any forms use the - * same idea, but details differ. - */ - /** * Returns a new CompletableFuture that is completed when all of * the given CompletableFutures complete. If any of the given @@ -2688,82 +2214,7 @@ * {@code null} */ public static CompletableFuture allOf(CompletableFuture... cfs) { - int len = cfs.length; // Directly handle empty and singleton cases - if (len > 1) - return allTree(cfs, 0, len - 1); - else { - CompletableFuture dst = new CompletableFuture(); - CompletableFuture f; - if (len == 0) - dst.result = NIL; - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenPropagate d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenPropagate(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) - dst.internalComplete(null, (r instanceof AltResult) ? - ((AltResult)r).ex : null); - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an And'ed tree of CompletableFutures. - * Called only when array known to have at least two elements. - */ - private static CompletableFuture allTree(CompletableFuture[] cfs, - int lo, int hi) { - CompletableFuture fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - AndCompletion d = null; - CompletionNode p = null, q = null; - Object r = null, s = null; - while ((r = fst.result) == null || (s = snd.result) == null) { - if (d == null) - d = new AndCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null) && - (s != null || (s = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return andTree(cfs, 0, cfs.length - 1); } /** @@ -2782,92 +2233,7 @@ * {@code null} */ public static CompletableFuture anyOf(CompletableFuture... cfs) { - int len = cfs.length; // Same idea as allOf - if (len > 1) - return anyTree(cfs, 0, len - 1); - else { - CompletableFuture dst = new CompletableFuture(); - CompletableFuture f; - if (len == 0) - ; // skip - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenCopy d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenCopy(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an Or'ed tree of CompletableFutures. - */ - private static CompletableFuture anyTree(CompletableFuture[] cfs, - int lo, int hi) { - CompletableFuture fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - OrCompletion d = null; - CompletionNode p = null, q = null; - Object r; - while ((r = fst.result) == null && (r = snd.result) == null) { - if (d == null) - d = new OrCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null || - (r = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return orTree(cfs, 0, cfs.length - 1); } /* ------------- Control and status methods -------------- */ @@ -2887,8 +2253,7 @@ */ public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = (result == null) && - UNSAFE.compareAndSwapObject - (this, RESULT, null, new AltResult(new CancellationException())); + internalComplete(new AltResult(new CancellationException())); postComplete(); return cancelled || isCancelled(); } @@ -2940,11 +2305,12 @@ * Forcibly causes subsequent invocations of method {@link #get()} * and related methods to throw the given exception, whether or * not already completed. This method is designed for use only in - * recovery actions, and even in such situations may result in - * ongoing dependent completions using established versus + * error recovery actions, and even in such situations may result + * in ongoing dependent completions using established versus * overwritten outcomes. * * @param ex the exception + * @throws NullPointerException if the exception is null */ public void obtrudeException(Throwable ex) { if (ex == null) throw new NullPointerException(); @@ -2962,7 +2328,7 @@ */ public int getNumberOfDependents() { int count = 0; - for (CompletionNode p = completions; p != null; p = p.next) + for (Completion p = stack; p != null; p = p.next) ++count; return count; } @@ -2993,20 +2359,19 @@ // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; - private static final long WAITERS; - private static final long COMPLETIONS; + private static final long STACK; + private static final long NEXT; static { try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); + final sun.misc.Unsafe u; + UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class k = CompletableFuture.class; - RESULT = UNSAFE.objectFieldOffset - (k.getDeclaredField("result")); - WAITERS = UNSAFE.objectFieldOffset - (k.getDeclaredField("waiters")); - COMPLETIONS = UNSAFE.objectFieldOffset - (k.getDeclaredField("completions")); - } catch (Exception e) { - throw new Error(e); + RESULT = u.objectFieldOffset(k.getDeclaredField("result")); + STACK = u.objectFieldOffset(k.getDeclaredField("stack")); + NEXT = u.objectFieldOffset + (Completion.class.getDeclaredField("next")); + } catch (Exception x) { + throw new Error(x); } } }