# HG changeset patch # User chegar # Date 1365524867 -3600 # Node ID fcce5e09e23b1e50866fd26f3d15b76f555345a4 # Parent b3ca7ed8e44f155bf7d48498c44ee92e5d4f3b72 8005696: Add CompletableFuture Reviewed-by: chegar, martin diff -r b3ca7ed8e44f -r fcce5e09e23b jdk/make/java/java/FILES_java.gmk --- a/jdk/make/java/java/FILES_java.gmk Tue Apr 09 15:51:50 2013 +0100 +++ b/jdk/make/java/java/FILES_java.gmk Tue Apr 09 17:27:47 2013 +0100 @@ -316,6 +316,8 @@ java/util/concurrent/BrokenBarrierException.java \ java/util/concurrent/Callable.java \ java/util/concurrent/CancellationException.java \ + java/util/concurrent/CompletableFuture.java \ + java/util/concurrent/CompletionException.java \ java/util/concurrent/CompletionService.java \ java/util/concurrent/ConcurrentHashMap.java \ java/util/concurrent/ConcurrentLinkedDeque.java \ diff -r b3ca7ed8e44f -r fcce5e09e23b jdk/src/share/classes/java/util/concurrent/CompletableFuture.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/classes/java/util/concurrent/CompletableFuture.java Tue Apr 09 17:27:47 2013 +0100 @@ -0,0 +1,3305 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; +import java.util.function.Supplier; +import java.util.function.Consumer; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.BiFunction; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +/** + * A {@link Future} that may be explicitly completed (setting its + * value and status), and may include dependent functions and actions + * that trigger upon its completion. + * + *

When two or more threads attempt to + * {@link #complete complete}, + * {@link #completeExceptionally completeExceptionally}, or + * {@link #cancel cancel} + * a CompletableFuture, only one of them succeeds. + * + *

Methods are available for adding dependents based on + * user-provided Functions, Consumers, or Runnables. The appropriate + * form to use depends on whether actions require arguments and/or + * produce results. Completion of a dependent action will trigger the + * completion of another CompletableFuture. Actions may also be + * triggered after either or both the current and another + * CompletableFuture complete. Multiple CompletableFutures may also + * be grouped as one using {@link #anyOf(CompletableFuture...)} and + * {@link #allOf(CompletableFuture...)}. + * + *

CompletableFutures themselves do not execute asynchronously. + * However, actions supplied for dependent completions of another + * CompletableFuture may do so, depending on whether they are provided + * via one of the async methods (that is, methods with names + * of the form xxxAsync). The async + * methods provide a way to commence asynchronous processing of an + * action using either a given {@link Executor} or by default the + * {@link ForkJoinPool#commonPool()}. To simplify monitoring, + * debugging, and tracking, all generated asynchronous tasks are + * instances of the marker interface {@link AsynchronousCompletionTask}. + * + *

Actions supplied for dependent completions of non-async + * methods may be performed by the thread that completes the current + * CompletableFuture, or by any other caller of these methods. There + * are no guarantees about the order of processing completions unless + * constrained by these methods. + * + *

Since (unlike {@link FutureTask}) this class has no direct + * control over the computation that causes it to be completed, + * cancellation is treated as just another form of exceptional completion. + * Method {@link #cancel cancel} has the same effect as + * {@code completeExceptionally(new CancellationException())}. + * + *

Upon exceptional completion (including cancellation), or when a + * completion entails an additional computation which terminates + * abruptly with an (unchecked) exception or error, then all of their + * dependent completions (and their dependents in turn) generally act + * as {@code completeExceptionally} with a {@link CompletionException} + * holding that exception as its cause. However, the {@link + * #exceptionally exceptionally} and {@link #handle handle} + * completions are able to handle exceptional completions of + * the CompletableFutures they depend on. + * + *

In case of exceptional completion with a CompletionException, + * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an + * {@link ExecutionException} with the same cause as held in the + * corresponding CompletionException. However, in these cases, + * methods {@link #join()} and {@link #getNow} throw the + * CompletionException, which simplifies usage. + * + *

Arguments used to pass a completion result (that is, for parameters + * of type {@code T}) may be null, but passing a null value for any other + * parameter will result in a {@link NullPointerException} being thrown. + * + * @author Doug Lea + * @since 1.8 + */ +public class CompletableFuture implements Future { + + /* + * Overview: + * + * 1. Non-nullness of field result (set via CAS) indicates done. + * An AltResult is used to box null as a result, as well as to + * hold exceptions. Using a single field makes completion fast + * and simple to detect and trigger, at the expense of a lot of + * encoding and decoding that infiltrates many methods. One minor + * simplification relies on the (static) NIL (to box null results) + * being the only AltResult with a null exception field, so we + * don't usually need explicit comparisons with NIL. The CF + * exception propagation mechanics surrounding decoding rely on + * unchecked casts of decoded results really being unchecked, + * where user type errors are caught at point of use, as is + * currently the case in Java. These are highlighted by using + * SuppressWarnings-annotated temporaries. + * + * 2. Waiters are held in a Treiber stack similar to the one used + * in FutureTask, Phaser, and SynchronousQueue. See their + * internal documentation for algorithmic details. + * + * 3. Completions are also kept in a list/stack, and pulled off + * and run when completion is triggered. (We could even use the + * same stack as for waiters, but would give up the potential + * parallelism obtained because woken waiters help release/run + * others -- see method postComplete). Because post-processing + * may race with direct calls, class Completion opportunistically + * extends AtomicInteger so callers can claim the action via + * compareAndSet(0, 1). The Completion.run methods are all + * written a boringly similar uniform way (that sometimes includes + * unnecessary-looking checks, kept to maintain uniformity). + * There are enough dimensions upon which they differ that + * attempts to factor commonalities while maintaining efficiency + * require more lines of code than they would save. + * + * 4. The exported then/and/or methods do support a bit of + * factoring (see doThenApply etc). They must cope with the + * intrinsic races surrounding addition of a dependent action + * versus performing the action directly because the task is + * already complete. For example, a CF may not be complete upon + * entry, so a dependent completion is added, but by the time it + * is added, the target CF is complete, so must be directly + * executed. This is all done while avoiding unnecessary object + * construction in safe-bypass cases. + */ + + // preliminaries + + static final class AltResult { + final Throwable ex; // null only for NIL + AltResult(Throwable ex) { this.ex = ex; } + } + + static final AltResult NIL = new AltResult(null); + + // Fields + + volatile Object result; // Either the result or boxed AltResult + volatile WaitNode waiters; // Treiber stack of threads blocked on get() + volatile CompletionNode completions; // list (Treiber stack) of completions + + // Basic utilities for triggering and processing completions + + /** + * Removes and signals all waiting threads and runs all completions. + */ + final void postComplete() { + WaitNode q; Thread t; + while ((q = waiters) != null) { + if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && + (t = q.thread) != null) { + q.thread = null; + LockSupport.unpark(t); + } + } + + CompletionNode h; Completion c; + while ((h = completions) != null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && + (c = h.completion) != null) + c.run(); + } + } + + /** + * Triggers completion with the encoding of the given arguments: + * if the exception is non-null, encodes it as a wrapped + * CompletionException unless it is one already. Otherwise uses + * the given result, boxed as NIL if null. + */ + final void internalComplete(T v, Throwable ex) { + if (result == null) + UNSAFE.compareAndSwapObject + (this, RESULT, null, + (ex == null) ? (v == null) ? NIL : v : + new AltResult((ex instanceof CompletionException) ? ex : + new CompletionException(ex))); + postComplete(); // help out even if not triggered + } + + /** + * If triggered, helps release and/or process completions. + */ + final void helpPostComplete() { + if (result != null) + postComplete(); + } + + /* ------------- waiting for completions -------------- */ + + /** Number of processors, for spin control */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Heuristic spin value for waitingGet() before blocking on + * multiprocessors + */ + static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; + + /** + * Linked nodes to record waiting threads in a Treiber stack. See + * other classes such as Phaser and SynchronousQueue for more + * detailed explanation. This class implements ManagedBlocker to + * avoid starvation when blocking actions pile up in + * ForkJoinPools. + */ + static final class WaitNode implements ForkJoinPool.ManagedBlocker { + long nanos; // wait time if timed + final long deadline; // non-zero if timed + volatile int interruptControl; // > 0: interruptible, < 0: interrupted + volatile Thread thread; + volatile WaitNode next; + WaitNode(boolean interruptible, long nanos, long deadline) { + this.thread = Thread.currentThread(); + this.interruptControl = interruptible ? 1 : 0; + this.nanos = nanos; + this.deadline = deadline; + } + public boolean isReleasable() { + if (thread == null) + return true; + if (Thread.interrupted()) { + int i = interruptControl; + interruptControl = -1; + if (i > 0) + return true; + } + if (deadline != 0L && + (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { + thread = null; + return true; + } + return false; + } + public boolean block() { + if (isReleasable()) + return true; + else if (deadline == 0L) + LockSupport.park(this); + else if (nanos > 0L) + LockSupport.parkNanos(this, nanos); + return isReleasable(); + } + } + + /** + * Returns raw result after waiting, or null if interruptible and + * interrupted. + */ + private Object waitingGet(boolean interruptible) { + WaitNode q = null; + boolean queued = false; + int spins = SPINS; + for (Object r;;) { + if ((r = result) != null) { + if (q != null) { // suppress unpark + q.thread = null; + if (q.interruptControl < 0) { + if (interruptible) { + removeWaiter(q); + return null; + } + Thread.currentThread().interrupt(); + } + } + postComplete(); // help release others + return r; + } + else if (spins > 0) { + int rnd = ThreadLocalRandom.nextSecondarySeed(); + if (rnd == 0) + rnd = ThreadLocalRandom.current().nextInt(); + if (rnd >= 0) + --spins; + } + else if (q == null) + q = new WaitNode(interruptible, 0L, 0L); + else if (!queued) + queued = UNSAFE.compareAndSwapObject(this, WAITERS, + q.next = waiters, q); + else if (interruptible && q.interruptControl < 0) { + removeWaiter(q); + return null; + } + else if (q.thread != null && result == null) { + try { + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ex) { + q.interruptControl = -1; + } + } + } + } + + /** + * Awaits completion or aborts on interrupt or timeout. + * + * @param nanos time to wait + * @return raw result + */ + private Object timedAwaitDone(long nanos) + throws InterruptedException, TimeoutException { + WaitNode q = null; + boolean queued = false; + for (Object r;;) { + if ((r = result) != null) { + if (q != null) { + q.thread = null; + if (q.interruptControl < 0) { + removeWaiter(q); + throw new InterruptedException(); + } + } + postComplete(); + return r; + } + else if (q == null) { + if (nanos <= 0L) + throw new TimeoutException(); + long d = System.nanoTime() + nanos; + q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 + } + else if (!queued) + queued = UNSAFE.compareAndSwapObject(this, WAITERS, + q.next = waiters, q); + else if (q.interruptControl < 0) { + removeWaiter(q); + throw new InterruptedException(); + } + else if (q.nanos <= 0L) { + if (result == null) { + removeWaiter(q); + throw new TimeoutException(); + } + } + else if (q.thread != null && result == null) { + try { + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ex) { + q.interruptControl = -1; + } + } + } + } + + /** + * Tries to unlink a timed-out or interrupted wait node to avoid + * accumulating garbage. Internal nodes are simply unspliced + * without CAS since it is harmless if they are traversed anyway + * by releasers. To avoid effects of unsplicing from already + * removed nodes, the list is retraversed in case of an apparent + * race. This is slow when there are a lot of nodes, but we don't + * expect lists to be long enough to outweigh higher-overhead + * schemes. + */ + private void removeWaiter(WaitNode node) { + if (node != null) { + node.thread = null; + retry: + for (;;) { // restart on removeWaiter race + for (WaitNode pred = null, q = waiters, s; q != null; q = s) { + s = q.next; + if (q.thread != null) + pred = q; + else if (pred != null) { + pred.next = s; + if (pred.thread == null) // check for race + continue retry; + } + else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) + continue retry; + } + break; + } + } + } + + /* ------------- Async tasks -------------- */ + + /** + * A marker interface identifying asynchronous tasks produced by + * {@code async} methods. This may be useful for monitoring, + * debugging, and tracking asynchronous activities. + * + * @since 1.8 + */ + public static interface AsynchronousCompletionTask { + } + + /** Base class can act as either FJ or plain Runnable */ + abstract static class Async extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final void run() { exec(); } + } + + static final class AsyncRun extends Async { + final Runnable fn; + final CompletableFuture dst; + AsyncRun(Runnable fn, CompletableFuture dst) { + this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + fn.run(); + ex = null; + } catch (Throwable rex) { + ex = rex; + } + d.internalComplete(null, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncSupply extends Async { + final Supplier fn; + final CompletableFuture dst; + AsyncSupply(Supplier fn, CompletableFuture dst) { + this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; U u; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + u = fn.get(); + ex = null; + } catch (Throwable rex) { + ex = rex; + u = null; + } + d.internalComplete(u, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncApply extends Async { + final T arg; + final Function fn; + final CompletableFuture dst; + AsyncApply(T arg, Function fn, + CompletableFuture dst) { + this.arg = arg; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; U u; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + u = fn.apply(arg); + ex = null; + } catch (Throwable rex) { + ex = rex; + u = null; + } + d.internalComplete(u, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncCombine extends Async { + final T arg1; + final U arg2; + final BiFunction fn; + final CompletableFuture dst; + AsyncCombine(T arg1, U arg2, + BiFunction fn, + CompletableFuture dst) { + this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; V v; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + v = fn.apply(arg1, arg2); + ex = null; + } catch (Throwable rex) { + ex = rex; + v = null; + } + d.internalComplete(v, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncAccept extends Async { + final T arg; + final Consumer fn; + final CompletableFuture dst; + AsyncAccept(T arg, Consumer fn, + CompletableFuture dst) { + this.arg = arg; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + fn.accept(arg); + ex = null; + } catch (Throwable rex) { + ex = rex; + } + d.internalComplete(null, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncAcceptBoth extends Async { + final T arg1; + final U arg2; + final BiConsumer fn; + final CompletableFuture dst; + AsyncAcceptBoth(T arg1, U arg2, + BiConsumer fn, + CompletableFuture dst) { + this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + fn.accept(arg1, arg2); + ex = null; + } catch (Throwable rex) { + ex = rex; + } + d.internalComplete(null, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AsyncCompose extends Async { + final T arg; + final Function> fn; + final CompletableFuture dst; + AsyncCompose(T arg, + Function> fn, + CompletableFuture dst) { + this.arg = arg; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture d, fr; U u; Throwable ex; + if ((d = this.dst) != null && d.result == null) { + try { + fr = fn.apply(arg); + ex = (fr == null) ? new NullPointerException() : null; + } catch (Throwable rex) { + ex = rex; + fr = null; + } + if (ex != null) + u = null; + else { + Object r = fr.result; + if (r == null) + r = fr.waitingGet(false); + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U ur = (U) r; + u = ur; + } + } + d.internalComplete(u, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + + /* ------------- Completions -------------- */ + + /** + * Simple linked list nodes to record completions, used in + * basically the same way as WaitNodes. (We separate nodes from + * the Completions themselves mainly because for the And and Or + * methods, the same Completion object resides in two lists.) + */ + static final class CompletionNode { + final Completion completion; + volatile CompletionNode next; + CompletionNode(Completion completion) { this.completion = completion; } + } + + // Opportunistically subclass AtomicInteger to use compareAndSet to claim. + abstract static class Completion extends AtomicInteger implements Runnable { + } + + static final class ThenApply extends Completion { + final CompletableFuture src; + final Function fn; + final CompletableFuture dst; + final Executor executor; + ThenApply(CompletableFuture src, + Function fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final Function fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + U u = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncApply(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenAccept extends Completion { + final CompletableFuture src; + final Consumer fn; + final CompletableFuture dst; + final Executor executor; + ThenAccept(CompletableFuture src, + Consumer fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final Consumer fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAccept(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenRun extends Completion { + final CompletableFuture src; + final Runnable fn; + final CompletableFuture dst; + final Executor executor; + ThenRun(CompletableFuture src, + Runnable fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final Runnable fn; + final CompletableFuture dst; + Object r; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(fn, dst)); + else + fn.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenCombine extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final BiFunction fn; + final CompletableFuture dst; + final Executor executor; + ThenCombine(CompletableFuture src, + CompletableFuture snd, + BiFunction fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final BiFunction fn; + final CompletableFuture dst; + Object r, s; T t; U u; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + (b = this.snd) != null && + (s = b.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + Executor e = executor; + V v = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncCombine(t, u, fn, dst)); + else + v = fn.apply(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(v, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenAcceptBoth extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final BiConsumer fn; + final CompletableFuture dst; + final Executor executor; + ThenAcceptBoth(CompletableFuture src, + CompletableFuture snd, + BiConsumer fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final BiConsumer fn; + final CompletableFuture dst; + Object r, s; T t; U u; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + (b = this.snd) != null && + (s = b.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAcceptBoth(t, u, fn, dst)); + else + fn.accept(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class RunAfterBoth extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final Runnable fn; + final CompletableFuture dst; + final Executor executor; + RunAfterBoth(CompletableFuture src, + CompletableFuture snd, + Runnable fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final Runnable fn; + final CompletableFuture dst; + Object r, s; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + (b = this.snd) != null && + (s = b.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null && (s instanceof AltResult)) + ex = ((AltResult)s).ex; + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(fn, dst)); + else + fn.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AndCompletion extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final CompletableFuture dst; + AndCompletion(CompletableFuture src, + CompletableFuture snd, + CompletableFuture dst) { + this.src = src; this.snd = snd; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final CompletableFuture dst; + Object r, s; Throwable ex; + if ((dst = this.dst) != null && + (a = this.src) != null && + (r = a.result) != null && + (b = this.snd) != null && + (s = b.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null && (s instanceof AltResult)) + ex = ((AltResult)s).ex; + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ApplyToEither extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final Function fn; + final CompletableFuture dst; + final Executor executor; + ApplyToEither(CompletableFuture src, + CompletableFuture snd, + Function fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final Function fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (((a = this.src) != null && (r = a.result) != null) || + ((b = this.snd) != null && (r = b.result) != null)) && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + U u = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncApply(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class AcceptEither extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final Consumer fn; + final CompletableFuture dst; + final Executor executor; + AcceptEither(CompletableFuture src, + CompletableFuture snd, + Consumer fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final Consumer fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (((a = this.src) != null && (r = a.result) != null) || + ((b = this.snd) != null && (r = b.result) != null)) && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAccept(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class RunAfterEither extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final Runnable fn; + final CompletableFuture dst; + final Executor executor; + RunAfterEither(CompletableFuture src, + CompletableFuture snd, + Runnable fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.snd = snd; + this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final Runnable fn; + final CompletableFuture dst; + Object r; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (((a = this.src) != null && (r = a.result) != null) || + ((b = this.snd) != null && (r = b.result) != null)) && + compareAndSet(0, 1)) { + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + Executor e = executor; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(fn, dst)); + else + fn.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class OrCompletion extends Completion { + final CompletableFuture src; + final CompletableFuture snd; + final CompletableFuture dst; + OrCompletion(CompletableFuture src, + CompletableFuture snd, + CompletableFuture dst) { + this.src = src; this.snd = snd; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture b; + final CompletableFuture dst; + Object r, t; Throwable ex; + if ((dst = this.dst) != null && + (((a = this.src) != null && (r = a.result) != null) || + ((b = this.snd) != null && (r = b.result) != null)) && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + t = r; + } + dst.internalComplete(t, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ExceptionCompletion extends Completion { + final CompletableFuture src; + final Function fn; + final CompletableFuture dst; + ExceptionCompletion(CompletableFuture src, + Function fn, + CompletableFuture dst) { + this.src = src; this.fn = fn; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final Function fn; + final CompletableFuture dst; + Object r; T t = null; Throwable ex, dx = null; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if ((r instanceof AltResult) && + (ex = ((AltResult)r).ex) != null) { + try { + t = fn.apply(ex); + } catch (Throwable rex) { + dx = rex; + } + } + else { + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + dst.internalComplete(t, dx); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenCopy extends Completion { + final CompletableFuture src; + final CompletableFuture dst; + ThenCopy(CompletableFuture src, + CompletableFuture dst) { + this.src = src; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + dst.internalComplete(t, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + // version of ThenCopy for CompletableFuture dst + static final class ThenPropagate extends Completion { + final CompletableFuture src; + final CompletableFuture dst; + ThenPropagate(CompletableFuture src, + CompletableFuture dst) { + this.src = src; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final CompletableFuture dst; + Object r; Throwable ex; + if ((dst = this.dst) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + dst.internalComplete(null, ex); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class HandleCompletion extends Completion { + final CompletableFuture src; + final BiFunction fn; + final CompletableFuture dst; + HandleCompletion(CompletableFuture src, + BiFunction fn, + CompletableFuture dst) { + this.src = src; this.fn = fn; this.dst = dst; + } + public final void run() { + final CompletableFuture a; + final BiFunction fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; Throwable dx = null; + try { + u = fn.apply(t, ex); + } catch (Throwable rex) { + dx = rex; + } + dst.internalComplete(u, dx); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + static final class ThenCompose extends Completion { + final CompletableFuture src; + final Function> fn; + final CompletableFuture dst; + final Executor executor; + ThenCompose(CompletableFuture src, + Function> fn, + CompletableFuture dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture a; + final Function> fn; + final CompletableFuture dst; + Object r; T t; Throwable ex; Executor e; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + CompletableFuture c = null; + U u = null; + boolean complete = false; + if (ex == null) { + if ((e = executor) != null) + e.execute(new AsyncCompose(t, fn, dst)); + else { + try { + if ((c = fn.apply(t)) == null) + ex = new NullPointerException(); + } catch (Throwable rex) { + ex = rex; + } + } + } + if (c != null) { + ThenCopy d = null; + Object s; + if ((s = c.result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenCopy(c, dst)); + while ((s = c.result) == null) { + if (UNSAFE.compareAndSwapObject + (c, COMPLETIONS, p.next = c.completions, p)) + break; + } + } + if (s != null && (d == null || d.compareAndSet(0, 1))) { + complete = true; + if (s instanceof AltResult) { + ex = ((AltResult)s).ex; // no rewrap + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + } + } + if (complete || ex != null) + dst.internalComplete(u, ex); + if (c != null) + c.helpPostComplete(); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + + // public methods + + /** + * Creates a new incomplete CompletableFuture. + */ + public CompletableFuture() { + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the {@link ForkJoinPool#commonPool()} with + * the value obtained by calling the given Supplier. + * + * @param supplier a function returning the value to be used + * to complete the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture supplyAsync(Supplier supplier) { + if (supplier == null) throw new NullPointerException(); + CompletableFuture f = new CompletableFuture(); + ForkJoinPool.commonPool(). + execute((ForkJoinTask)new AsyncSupply(supplier, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the given executor with the value obtained + * by calling the given Supplier. + * + * @param supplier a function returning the value to be used + * to complete the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public static CompletableFuture supplyAsync(Supplier supplier, + Executor executor) { + if (executor == null || supplier == null) + throw new NullPointerException(); + CompletableFuture f = new CompletableFuture(); + executor.execute(new AsyncSupply(supplier, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the {@link ForkJoinPool#commonPool()} after + * it runs the given action. + * + * @param runnable the action to run before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture runAsync(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + CompletableFuture f = new CompletableFuture(); + ForkJoinPool.commonPool(). + execute((ForkJoinTask)new AsyncRun(runnable, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the given executor after it runs the given + * action. + * + * @param runnable the action to run before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public static CompletableFuture runAsync(Runnable runnable, + Executor executor) { + if (executor == null || runnable == null) + throw new NullPointerException(); + CompletableFuture f = new CompletableFuture(); + executor.execute(new AsyncRun(runnable, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is already completed with + * the given value. + * + * @param value the value + * @return the completed CompletableFuture + */ + public static CompletableFuture completedFuture(U value) { + CompletableFuture f = new CompletableFuture(); + f.result = (value == null) ? NIL : value; + return f; + } + + /** + * Returns {@code true} if completed in any fashion: normally, + * exceptionally, or via cancellation. + * + * @return {@code true} if completed + */ + public boolean isDone() { + return result != null; + } + + /** + * Waits if necessary for this future to complete, and then + * returns its result. + * + * @return the result value + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted + * while waiting + */ + public T get() throws InterruptedException, ExecutionException { + Object r; Throwable ex, cause; + if ((r = result) == null && (r = waitingGet(true)) == null) + throw new InterruptedException(); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if ((ex instanceof CompletionException) && + (cause = ex.getCause()) != null) + ex = cause; + throw new ExecutionException(ex); + } + + /** + * Waits if necessary for at most the given time for this future + * to complete, and then returns its result, if available. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the result value + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the wait timed out + */ + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + Object r; Throwable ex, cause; + long nanos = unit.toNanos(timeout); + if (Thread.interrupted()) + throw new InterruptedException(); + if ((r = result) == null) + r = timedAwaitDone(nanos); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if ((ex instanceof CompletionException) && + (cause = ex.getCause()) != null) + ex = cause; + throw new ExecutionException(ex); + } + + /** + * Returns the result value when complete, or throws an + * (unchecked) exception if completed exceptionally. To better + * conform with the use of common functional forms, if a + * computation involved in the completion of this + * CompletableFuture threw an exception, this method throws an + * (unchecked) {@link CompletionException} with the underlying + * exception as its cause. + * + * @return the result value + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed + * exceptionally or a completion computation threw an exception + */ + public T join() { + Object r; Throwable ex; + if ((r = result) == null) + r = waitingGet(false); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if (ex instanceof CompletionException) + throw (CompletionException)ex; + throw new CompletionException(ex); + } + + /** + * Returns the result value (or throws any encountered exception) + * if completed, else returns the given valueIfAbsent. + * + * @param valueIfAbsent the value to return if not completed + * @return the result value, if completed, else the given valueIfAbsent + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed + * exceptionally or a completion computation threw an exception + */ + public T getNow(T valueIfAbsent) { + Object r; Throwable ex; + if ((r = result) == null) + return valueIfAbsent; + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if (ex instanceof CompletionException) + throw (CompletionException)ex; + throw new CompletionException(ex); + } + + /** + * If not already completed, sets the value returned by {@link + * #get()} and related methods to the given value. + * + * @param value the result value + * @return {@code true} if this invocation caused this CompletableFuture + * to transition to a completed state, else {@code false} + */ + public boolean complete(T value) { + boolean triggered = result == null && + UNSAFE.compareAndSwapObject(this, RESULT, null, + value == null ? NIL : value); + postComplete(); + return triggered; + } + + /** + * If not already completed, causes invocations of {@link #get()} + * and related methods to throw the given exception. + * + * @param ex the exception + * @return {@code true} if this invocation caused this CompletableFuture + * to transition to a completed state, else {@code false} + */ + public boolean completeExceptionally(Throwable ex) { + if (ex == null) throw new NullPointerException(); + boolean triggered = result == null && + UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); + postComplete(); + return triggered; + } + + /** + * Returns a new CompletableFuture that is completed + * when this CompletableFuture completes, with the result of the + * given function of this CompletableFuture's result. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied function throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenApply(Function fn) { + return doThenApply(fn, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, with the result of the + * given function of this CompletableFuture's result from a + * task running in the {@link ForkJoinPool#commonPool()}. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied function throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenApplyAsync + (Function fn) { + return doThenApply(fn, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, with the result of the + * given function of this CompletableFuture's result from a + * task running in the given executor. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied function throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture thenApplyAsync + (Function fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenApply(fn, executor); + } + + private CompletableFuture doThenApply + (Function fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ThenApply d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenApply(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncApply(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when this CompletableFuture completes, after performing the given + * action with this CompletableFuture's result. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenAccept(Consumer block) { + return doThenAccept(block, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, after performing the given + * action with this CompletableFuture's result from a task running + * in the {@link ForkJoinPool#commonPool()}. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenAcceptAsync(Consumer block) { + return doThenAccept(block, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, after performing the given + * action with this CompletableFuture's result from a task running + * in the given executor. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param block the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture thenAcceptAsync(Consumer block, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenAccept(block, executor); + } + + private CompletableFuture doThenAccept(Consumer fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ThenAccept d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenAccept(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAccept(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when this CompletableFuture completes, after performing the given + * action. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenRun(Runnable action) { + return doThenRun(action, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, after performing the given + * action from a task running in the {@link ForkJoinPool#commonPool()}. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenRunAsync(Runnable action) { + return doThenRun(action, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when this CompletableFuture completes, after performing the given + * action from a task running in the given executor. + * + *

If this CompletableFuture completes exceptionally, or the + * supplied action throws an exception, then the returned + * CompletableFuture completes exceptionally with a + * CompletionException holding the exception as its cause. + * + * @param action the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture thenRunAsync(Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenRun(action, executor); + } + + private CompletableFuture doThenRun(Runnable action, + Executor e) { + if (action == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ThenRun d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenRun(this, action, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when both this and the other given CompletableFuture complete, + * with the result of the given function of the results of the two + * CompletableFutures. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied function throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenCombine + (CompletableFuture other, + BiFunction fn) { + return doThenCombine(other, fn, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * with the result of the given function of the results of the two + * CompletableFutures from a task running in the + * {@link ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied function throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenCombineAsync + (CompletableFuture other, + BiFunction fn) { + return doThenCombine(other, fn, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * with the result of the given function of the results of the two + * CompletableFutures from a task running in the given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied function throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture thenCombineAsync + (CompletableFuture other, + BiFunction fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenCombine(other, fn, executor); + } + + private CompletableFuture doThenCombine + (CompletableFuture other, + BiFunction fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ThenCombine d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new ThenCombine(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + T t; U u; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + V v = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncCombine(t, u, fn, dst)); + else + v = fn.apply(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(v, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when both this and the other given CompletableFuture complete, + * after performing the given action with the results of the two + * CompletableFutures. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenAcceptBoth + (CompletableFuture other, + BiConsumer block) { + return doThenAcceptBoth(other, block, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * after performing the given action with the results of the two + * CompletableFutures from a task running in the {@link + * ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture thenAcceptBothAsync + (CompletableFuture other, + BiConsumer block) { + return doThenAcceptBoth(other, block, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * after performing the given action with the results of the two + * CompletableFutures from a task running in the given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture thenAcceptBothAsync + (CompletableFuture other, + BiConsumer block, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenAcceptBoth(other, block, executor); + } + + private CompletableFuture doThenAcceptBoth + (CompletableFuture other, + BiConsumer fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ThenAcceptBoth d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new ThenAcceptBoth(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + T t; U u; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAcceptBoth(t, u, fn, dst)); + else + fn.accept(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when both this and the other given CompletableFuture complete, + * after performing the given action. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture runAfterBoth(CompletableFuture other, + Runnable action) { + return doRunAfterBoth(other, action, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * after performing the given action from a task running in the + * {@link ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture runAfterBothAsync(CompletableFuture other, + Runnable action) { + return doRunAfterBoth(other, action, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when both this and the other given CompletableFuture complete, + * after performing the given action from a task running in the + * given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, or the supplied action throws an exception, + * then the returned CompletableFuture completes exceptionally + * with a CompletionException holding the exception as its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture runAfterBothAsync(CompletableFuture other, + Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doRunAfterBoth(other, action, executor); + } + + private CompletableFuture doRunAfterBoth(CompletableFuture other, + Runnable action, + Executor e) { + if (other == null || action == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + RunAfterBoth d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new RunAfterBoth(this, other, action, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null && (s instanceof AltResult)) + ex = ((AltResult)s).ex; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when either this or the other given CompletableFuture completes, + * with the result of the given function of either this or the other + * CompletableFuture's result. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied function + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture applyToEither + (CompletableFuture other, + Function fn) { + return doApplyToEither(other, fn, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * with the result of the given function of either this or the other + * CompletableFuture's result from a task running in the + * {@link ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied function + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture applyToEitherAsync + (CompletableFuture other, + Function fn) { + return doApplyToEither(other, fn, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * with the result of the given function of either this or the other + * CompletableFuture's result from a task running in the + * given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied function + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param fn the function to use to compute the value of + * the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture applyToEitherAsync + (CompletableFuture other, + Function fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doApplyToEither(other, fn, executor); + } + + private CompletableFuture doApplyToEither + (CompletableFuture other, + Function fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ApplyToEither d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new ApplyToEither(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncApply(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when either this or the other given CompletableFuture completes, + * after performing the given action with the result of either this + * or the other CompletableFuture's result. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture acceptEither + (CompletableFuture other, + Consumer block) { + return doAcceptEither(other, block, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * after performing the given action with the result of either this + * or the other CompletableFuture's result from a task running in + * the {@link ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture acceptEitherAsync + (CompletableFuture other, + Consumer block) { + return doAcceptEither(other, block, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * after performing the given action with the result of either this + * or the other CompletableFuture's result from a task running in + * the given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param block the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture acceptEitherAsync + (CompletableFuture other, + Consumer block, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doAcceptEither(other, block, executor); + } + + private CompletableFuture doAcceptEither + (CompletableFuture other, + Consumer fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + AcceptEither d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new AcceptEither(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncAccept(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed + * when either this or the other given CompletableFuture completes, + * after performing the given action. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture runAfterEither(CompletableFuture other, + Runnable action) { + return doRunAfterEither(other, action, null); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * after performing the given action from a task running in the + * {@link ForkJoinPool#commonPool()}. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture runAfterEitherAsync + (CompletableFuture other, + Runnable action) { + return doRunAfterEither(other, action, ForkJoinPool.commonPool()); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * when either this or the other given CompletableFuture completes, + * after performing the given action from a task running in the + * given executor. + * + *

If this and/or the other CompletableFuture complete + * exceptionally, then the returned CompletableFuture may also do so, + * with a CompletionException holding one of these exceptions as its + * cause. No guarantees are made about which result or exception is + * used in the returned CompletableFuture. If the supplied action + * throws an exception, then the returned CompletableFuture completes + * exceptionally with a CompletionException holding the exception as + * its cause. + * + * @param other the other CompletableFuture + * @param action the action to perform before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public CompletableFuture runAfterEitherAsync + (CompletableFuture other, + Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doRunAfterEither(other, action, executor); + } + + private CompletableFuture doRunAfterEither + (CompletableFuture other, + Runnable action, + Executor e) { + if (other == null || action == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + RunAfterEither d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new RunAfterEither(this, other, action, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null) { + try { + if (e != null) + e.execute(new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + /** + * Returns a CompletableFuture that upon completion, has the same + * value as produced by the given function of the result of this + * CompletableFuture. + * + *

If this CompletableFuture completes exceptionally, then the + * returned CompletableFuture also does so, with a + * CompletionException holding this exception as its cause. + * Similarly, if the computed CompletableFuture completes + * exceptionally, then so does the returned CompletableFuture. + * + * @param fn the function returning a new CompletableFuture + * @return the CompletableFuture + */ + public CompletableFuture thenCompose + (Function> fn) { + return doThenCompose(fn, null); + } + + /** + * Returns a CompletableFuture that upon completion, has the same + * value as that produced asynchronously using the {@link + * ForkJoinPool#commonPool()} by the given function of the result + * of this CompletableFuture. + * + *

If this CompletableFuture completes exceptionally, then the + * returned CompletableFuture also does so, with a + * CompletionException holding this exception as its cause. + * Similarly, if the computed CompletableFuture completes + * exceptionally, then so does the returned CompletableFuture. + * + * @param fn the function returning a new CompletableFuture + * @return the CompletableFuture + */ + public CompletableFuture thenComposeAsync + (Function> fn) { + return doThenCompose(fn, ForkJoinPool.commonPool()); + } + + /** + * Returns a CompletableFuture that upon completion, has the same + * value as that produced asynchronously using the given executor + * by the given function of this CompletableFuture. + * + *

If this CompletableFuture completes exceptionally, then the + * returned CompletableFuture also does so, with a + * CompletionException holding this exception as its cause. + * Similarly, if the computed CompletableFuture completes + * exceptionally, then so does the returned CompletableFuture. + * + * @param fn the function returning a new CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the CompletableFuture + */ + public CompletableFuture thenComposeAsync + (Function> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenCompose(fn, executor); + } + + private CompletableFuture doThenCompose + (Function> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture dst = null; + ThenCompose d = null; + Object r; + if ((r = result) == null) { + dst = new CompletableFuture(); + CompletionNode p = new CompletionNode + (d = new ThenCompose(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + if (e != null) { + if (dst == null) + dst = new CompletableFuture(); + e.execute(new AsyncCompose(t, fn, dst)); + } + else { + try { + if ((dst = fn.apply(t)) == null) + ex = new NullPointerException(); + } catch (Throwable rex) { + ex = rex; + } + } + } + if (dst == null) + dst = new CompletableFuture(); + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + dst.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed when this + * CompletableFuture completes, with the result of the given + * function of the exception triggering this CompletableFuture's + * completion when it completes exceptionally; otherwise, if this + * CompletableFuture completes normally, then the returned + * CompletableFuture also completes normally with the same value. + * + * @param fn the function to use to compute the value of the + * returned CompletableFuture if this CompletableFuture completed + * exceptionally + * @return the new CompletableFuture + */ + public CompletableFuture exceptionally + (Function fn) { + if (fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + ExceptionCompletion d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = + new CompletionNode(d = new ExceptionCompletion(this, fn, dst)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, + p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t = null; Throwable ex, dx = null; + if (r instanceof AltResult) { + if ((ex = ((AltResult)r).ex) != null) { + try { + t = fn.apply(ex); + } catch (Throwable rex) { + dx = rex; + } + } + } + else { + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + dst.internalComplete(t, dx); + } + helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed when this + * CompletableFuture completes, with the result of the given + * function of the result and exception of this CompletableFuture's + * completion. The given function is invoked with the result (or + * {@code null} if none) and the exception (or {@code null} if none) + * of this CompletableFuture when complete. + * + * @param fn the function to use to compute the value of the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture handle + (BiFunction fn) { + if (fn == null) throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + HandleCompletion d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = + new CompletionNode(d = new HandleCompletion(this, fn, dst)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, + p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u; Throwable dx; + try { + u = fn.apply(t, ex); + dx = null; + } catch (Throwable rex) { + dx = rex; + u = null; + } + dst.internalComplete(u, dx); + } + helpPostComplete(); + return dst; + } + + + /* ------------- Arbitrary-arity constructions -------------- */ + + /* + * The basic plan of attack is to recursively form binary + * completion trees of elements. This can be overkill for small + * sets, but scales nicely. The And/All vs Or/Any forms use the + * same idea, but details differ. + */ + + /** + * Returns a new CompletableFuture that is completed when all of + * the given CompletableFutures complete. If any of the given + * CompletableFutures complete exceptionally, then the returned + * CompletableFuture also does so, with a CompletionException + * holding this exception as its cause. Otherwise, the results, + * if any, of the given CompletableFutures are not reflected in + * the returned CompletableFuture, but may be obtained by + * inspecting them individually. If no CompletableFutures are + * provided, returns a CompletableFuture completed with the value + * {@code null}. + * + *

Among the applications of this method is to await completion + * of a set of independent CompletableFutures before continuing a + * program, as in: {@code CompletableFuture.allOf(c1, c2, + * c3).join();}. + * + * @param cfs the CompletableFutures + * @return a new CompletableFuture that is completed when all of the + * given CompletableFutures complete + * @throws NullPointerException if the array or any of its elements are + * {@code null} + */ + public static CompletableFuture allOf(CompletableFuture... cfs) { + int len = cfs.length; // Directly handle empty and singleton cases + if (len > 1) + return allTree(cfs, 0, len - 1); + else { + CompletableFuture dst = new CompletableFuture(); + CompletableFuture f; + if (len == 0) + dst.result = NIL; + else if ((f = cfs[0]) == null) + throw new NullPointerException(); + else { + ThenPropagate d = null; + CompletionNode p = null; + Object r; + while ((r = f.result) == null) { + if (d == null) + d = new ThenPropagate(f, dst); + else if (p == null) + p = new CompletionNode(d); + else if (UNSAFE.compareAndSwapObject + (f, COMPLETIONS, p.next = f.completions, p)) + break; + } + if (r != null && (d == null || d.compareAndSet(0, 1))) + dst.internalComplete(null, (r instanceof AltResult) ? + ((AltResult)r).ex : null); + f.helpPostComplete(); + } + return dst; + } + } + + /** + * Recursively constructs an And'ed tree of CompletableFutures. + * Called only when array known to have at least two elements. + */ + private static CompletableFuture allTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture fst, snd; + int mid = (lo + hi) >>> 1; + if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || + (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + AndCompletion d = null; + CompletionNode p = null, q = null; + Object r = null, s = null; + while ((r = fst.result) == null || (s = snd.result) == null) { + if (d == null) + d = new AndCompletion(fst, snd, dst); + else if (p == null) + p = new CompletionNode(d); + else if (q == null) { + if (UNSAFE.compareAndSwapObject + (fst, COMPLETIONS, p.next = fst.completions, p)) + q = new CompletionNode(d); + } + else if (UNSAFE.compareAndSwapObject + (snd, COMPLETIONS, q.next = snd.completions, q)) + break; + } + if ((r != null || (r = fst.result) != null) && + (s != null || (s = snd.result) != null) && + (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null && (s instanceof AltResult)) + ex = ((AltResult)s).ex; + dst.internalComplete(null, ex); + } + fst.helpPostComplete(); + snd.helpPostComplete(); + return dst; + } + + /** + * Returns a new CompletableFuture that is completed when any of + * the given CompletableFutures complete, with the same result. + * Otherwise, if it completed exceptionally, the returned + * CompletableFuture also does so, with a CompletionException + * holding this exception as its cause. If no CompletableFutures + * are provided, returns an incomplete CompletableFuture. + * + * @param cfs the CompletableFutures + * @return a new CompletableFuture that is completed with the + * result or exception of any of the given CompletableFutures when + * one completes + * @throws NullPointerException if the array or any of its elements are + * {@code null} + */ + public static CompletableFuture anyOf(CompletableFuture... cfs) { + int len = cfs.length; // Same idea as allOf + if (len > 1) + return anyTree(cfs, 0, len - 1); + else { + CompletableFuture dst = new CompletableFuture(); + CompletableFuture f; + if (len == 0) + ; // skip + else if ((f = cfs[0]) == null) + throw new NullPointerException(); + else { + ThenCopy d = null; + CompletionNode p = null; + Object r; + while ((r = f.result) == null) { + if (d == null) + d = new ThenCopy(f, dst); + else if (p == null) + p = new CompletionNode(d); + else if (UNSAFE.compareAndSwapObject + (f, COMPLETIONS, p.next = f.completions, p)) + break; + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; Object t; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + t = r; + } + dst.internalComplete(t, ex); + } + f.helpPostComplete(); + } + return dst; + } + } + + /** + * Recursively constructs an Or'ed tree of CompletableFutures. + */ + private static CompletableFuture anyTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture fst, snd; + int mid = (lo + hi) >>> 1; + if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || + (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + CompletableFuture dst = new CompletableFuture(); + OrCompletion d = null; + CompletionNode p = null, q = null; + Object r; + while ((r = fst.result) == null && (r = snd.result) == null) { + if (d == null) + d = new OrCompletion(fst, snd, dst); + else if (p == null) + p = new CompletionNode(d); + else if (q == null) { + if (UNSAFE.compareAndSwapObject + (fst, COMPLETIONS, p.next = fst.completions, p)) + q = new CompletionNode(d); + } + else if (UNSAFE.compareAndSwapObject + (snd, COMPLETIONS, q.next = snd.completions, q)) + break; + } + if ((r != null || (r = fst.result) != null || + (r = snd.result) != null) && + (d == null || d.compareAndSet(0, 1))) { + Throwable ex; Object t; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + t = r; + } + dst.internalComplete(t, ex); + } + fst.helpPostComplete(); + snd.helpPostComplete(); + return dst; + } + + /* ------------- Control and status methods -------------- */ + + /** + * If not already completed, completes this CompletableFuture with + * a {@link CancellationException}. Dependent CompletableFutures + * that have not already completed will also complete + * exceptionally, with a {@link CompletionException} caused by + * this {@code CancellationException}. + * + * @param mayInterruptIfRunning this value has no effect in this + * implementation because interrupts are not used to control + * processing. + * + * @return {@code true} if this task is now cancelled + */ + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = (result == null) && + UNSAFE.compareAndSwapObject + (this, RESULT, null, new AltResult(new CancellationException())); + postComplete(); + return cancelled || isCancelled(); + } + + /** + * Returns {@code true} if this CompletableFuture was cancelled + * before it completed normally. + * + * @return {@code true} if this CompletableFuture was cancelled + * before it completed normally + */ + public boolean isCancelled() { + Object r; + return ((r = result) instanceof AltResult) && + (((AltResult)r).ex instanceof CancellationException); + } + + /** + * Forcibly sets or resets the value subsequently returned by + * method {@link #get()} and related methods, whether or not + * already completed. This method is designed for use only in + * error recovery actions, and even in such situations may result + * in ongoing dependent completions using established versus + * overwritten outcomes. + * + * @param value the completion value + */ + public void obtrudeValue(T value) { + result = (value == null) ? NIL : value; + postComplete(); + } + + /** + * Forcibly causes subsequent invocations of method {@link #get()} + * and related methods to throw the given exception, whether or + * not already completed. This method is designed for use only in + * recovery actions, and even in such situations may result in + * ongoing dependent completions using established versus + * overwritten outcomes. + * + * @param ex the exception + */ + public void obtrudeException(Throwable ex) { + if (ex == null) throw new NullPointerException(); + result = new AltResult(ex); + postComplete(); + } + + /** + * Returns the estimated number of CompletableFutures whose + * completions are awaiting completion of this CompletableFuture. + * This method is designed for use in monitoring system state, not + * for synchronization control. + * + * @return the number of dependent CompletableFutures + */ + public int getNumberOfDependents() { + int count = 0; + for (CompletionNode p = completions; p != null; p = p.next) + ++count; + return count; + } + + /** + * Returns a string identifying this CompletableFuture, as well as + * its completion state. The state, in brackets, contains the + * String {@code "Completed Normally"} or the String {@code + * "Completed Exceptionally"}, or the String {@code "Not + * completed"} followed by the number of CompletableFutures + * dependent upon its completion, if any. + * + * @return a string identifying this CompletableFuture, as well as its state + */ + public String toString() { + Object r = result; + int count; + return super.toString() + + ((r == null) ? + (((count = getNumberOfDependents()) == 0) ? + "[Not completed]" : + "[Not completed, " + count + " dependents]") : + (((r instanceof AltResult) && ((AltResult)r).ex != null) ? + "[Completed exceptionally]" : + "[Completed normally]")); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long RESULT; + private static final long WAITERS; + private static final long COMPLETIONS; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = CompletableFuture.class; + RESULT = UNSAFE.objectFieldOffset + (k.getDeclaredField("result")); + WAITERS = UNSAFE.objectFieldOffset + (k.getDeclaredField("waiters")); + COMPLETIONS = UNSAFE.objectFieldOffset + (k.getDeclaredField("completions")); + } catch (Exception e) { + throw new Error(e); + } + } +} diff -r b3ca7ed8e44f -r fcce5e09e23b jdk/src/share/classes/java/util/concurrent/CompletionException.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/classes/java/util/concurrent/CompletionException.java Tue Apr 09 17:27:47 2013 +0100 @@ -0,0 +1,90 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; + +/** + * Exception thrown when an error or other exception is encountered + * in the course of completing a result or task. + * + * @since 1.8 + * @author Doug Lea + */ +public class CompletionException extends RuntimeException { + private static final long serialVersionUID = 7830266012832686185L; + + /** + * Constructs a {@code CompletionException} with no detail message. + * The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + */ + protected CompletionException() { } + + /** + * Constructs a {@code CompletionException} with the specified detail + * message. The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + * + * @param message the detail message + */ + protected CompletionException(String message) { + super(message); + } + + /** + * Constructs a {@code CompletionException} with the specified detail + * message and cause. + * + * @param message the detail message + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public CompletionException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a {@code CompletionException} with the specified cause. + * The detail message is set to {@code (cause == null ? null : + * cause.toString())} (which typically contains the class and + * detail message of {@code cause}). + * + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public CompletionException(Throwable cause) { + super(cause); + } +} diff -r b3ca7ed8e44f -r fcce5e09e23b jdk/test/java/util/concurrent/CompletableFuture/Basic.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/CompletableFuture/Basic.java Tue Apr 09 17:27:47 2013 +0100 @@ -0,0 +1,761 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +/* + * @test + * @bug 8005696 + * @summary Basic tests for CompletableFuture + * @author Chris Hegarty + */ + +import java.lang.reflect.Array; +import java.util.concurrent.Phaser; +import static java.util.concurrent.TimeUnit.*; +import java.util.concurrent.CompletableFuture; +import static java.util.concurrent.CompletableFuture.*; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static java.util.concurrent.ForkJoinPool.*; +import java.util.concurrent.atomic.AtomicInteger; + + +public class Basic { + + static void checkCompletedNormally(CompletableFuture cf, Object value) { + checkCompletedNormally(cf, value == null ? null : new Object[] { value }); + } + + static void checkCompletedNormally(CompletableFuture cf, Object[] values) { + try { equalAnyOf(cf.join(), values); } catch (Throwable x) { unexpected(x); } + try { equalAnyOf(cf.getNow(null), values); } catch (Throwable x) { unexpected(x); } + try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); } + try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); } + check(cf.isDone(), "Expected isDone to be true, got:" + cf); + check(!cf.isCancelled(), "Expected isCancelled to be false"); + check(!cf.cancel(true), "Expected cancel to return false"); + check(cf.toString().contains("[Completed normally]")); + check(cf.complete(null) == false, "Expected complete() to fail"); + check(cf.completeExceptionally(new Throwable()) == false, + "Expected completeExceptionally() to fail"); + } + + static void checkCompletedExceptionally(CompletableFuture cf) + throws Exception + { + checkCompletedExceptionally(cf, false); + } + + @SuppressWarnings("unchecked") + static void checkCompletedExceptionally(CompletableFuture cf, boolean cancelled) + throws Exception + { + try { cf.join(); fail("Excepted exception to be thrown"); } + catch (CompletionException x) { if (cancelled) fail(); else pass(); } + catch (CancellationException x) { if (cancelled) pass(); else fail(); } + try { cf.getNow(null); fail("Excepted exception to be thrown"); } + catch (CompletionException x) { if (cancelled) fail(); else pass(); } + catch (CancellationException x) { if (cancelled) pass(); else fail(); } + try { cf.get(); fail("Excepted exception to be thrown");} + catch (CancellationException x) { if (cancelled) pass(); else fail(); } + catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } + try { cf.get(0L, SECONDS); fail("Excepted exception to be thrown");} + catch (CancellationException x) { if (cancelled) pass(); else fail(); } + catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } + check(cf.isDone(), "Expected isDone to be true, got:" + cf); + check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled()); + check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true)); + check(cf.toString().contains("[Completed exceptionally]")); // ## TODO: 'E'xceptionally + check(cf.complete((T)new Object()) == false, "Expected complete() to fail"); + check(cf.completeExceptionally(new Throwable()) == false, + "Expected completeExceptionally() to fail, already completed"); + } + + private static void realMain(String[] args) throws Throwable { + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + test(executor); + } finally { + executor.shutdown(); + executor.awaitTermination(30, SECONDS); + } + } + + static AtomicInteger atomicInt = new AtomicInteger(0); + + private static void test(ExecutorService executor) throws Throwable { + + Thread.currentThread().setName("mainThread"); + + //---------------------------------------------------------------- + // supplyAsync tests + //---------------------------------------------------------------- + try { + CompletableFuture cf = supplyAsync(() -> "a test string"); + checkCompletedNormally(cf, cf.join()); + cf = supplyAsync(() -> "a test string", commonPool()); + checkCompletedNormally(cf, cf.join()); + cf = supplyAsync(() -> "a test string", executor); + checkCompletedNormally(cf, cf.join()); + cf = supplyAsync(() -> { throw new RuntimeException(); }); + checkCompletedExceptionally(cf); + cf = supplyAsync(() -> { throw new RuntimeException(); }, commonPool()); + checkCompletedExceptionally(cf); + cf = supplyAsync(() -> { throw new RuntimeException(); }, executor); + checkCompletedExceptionally(cf); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // runAsync tests + //---------------------------------------------------------------- + try { + CompletableFuture cf = runAsync(() -> { }); + checkCompletedNormally(cf, cf.join()); + cf = runAsync(() -> { }, commonPool()); + checkCompletedNormally(cf, cf.join()); + cf = runAsync(() -> { }, executor); + checkCompletedNormally(cf, cf.join()); + cf = runAsync(() -> { throw new RuntimeException(); }); + checkCompletedExceptionally(cf); + cf = runAsync(() -> { throw new RuntimeException(); }, commonPool()); + checkCompletedExceptionally(cf); + cf = runAsync(() -> { throw new RuntimeException(); }, executor); + checkCompletedExceptionally(cf); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // explicit completion + //---------------------------------------------------------------- + try { + final Phaser phaser = new Phaser(1); + final int phase = phaser.getPhase(); + CompletableFuture cf; + cf = supplyAsync(() -> { phaser.awaitAdvance(phase); return 1; }); + cf.complete(2); + phaser.arrive(); + checkCompletedNormally(cf, 2); + + cf = supplyAsync(() -> { phaser.awaitAdvance(phase+1); return 1; }); + cf.completeExceptionally(new Throwable()); + phaser.arrive(); + checkCompletedExceptionally(cf); + + cf = supplyAsync(() -> { phaser.awaitAdvance(phase+2); return 1; }); + cf.cancel(true); + phaser.arrive(); + checkCompletedExceptionally(cf, true); + + cf = supplyAsync(() -> { phaser.awaitAdvance(phase+3); return 1; }); + check(cf.getNow(2) == 2); + phaser.arrive(); + checkCompletedNormally(cf, 1); + check(cf.getNow(2) == 1); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenApplyXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + CompletableFuture cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenApply((x) -> { if (x.equals("a test string")) return 1; else return 0; }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, 1); + + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, 1); + + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; }, executor); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, 1); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenApply((x) -> { return 0; } ); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenApplyAsync((x) -> { return 0; } ); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenApplyAsync((x) -> { return 0; }, executor); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenAcceptXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + int before = atomicInt.get(); + CompletableFuture cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenAccept((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }, executor); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenAccept((x) -> { atomicInt.incrementAndGet(); } ); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); } ); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); }, executor ); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenRunXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + int before = atomicInt.get(); + CompletableFuture cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> "a test string"); + cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor); + checkCompletedNormally(cf1, "a test string"); + checkCompletedNormally(cf2, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); }); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenCombineXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + CompletableFuture cf1 = supplyAsync(() -> 1); + CompletableFuture cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenCombine(cf2, (x, y) -> { return x + y; }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, 2); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, 2); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; }, executor); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, 2); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenCombine(cf2, (x, y) -> { return 0; }); + checkCompletedExceptionally(cf1); + checkCompletedNormally(cf2, 1); + checkCompletedExceptionally(cf3); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; }); + checkCompletedNormally(cf1, 1); + checkCompletedExceptionally(cf2); + checkCompletedExceptionally(cf3); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; }, executor); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + checkCompletedExceptionally(cf3); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenAcceptBothXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + int before = atomicInt.get(); + CompletableFuture cf1 = supplyAsync(() -> 1); + CompletableFuture cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }, executor); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> 1); + cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { atomicInt.incrementAndGet(); }); + checkCompletedExceptionally(cf1); + checkCompletedNormally(cf2, 1); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, 1); + checkCompletedExceptionally(cf2); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); }, executor); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // runAfterBothXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + int before = atomicInt.get(); + CompletableFuture cf1 = supplyAsync(() -> 1); + CompletableFuture cf2 = supplyAsync(() -> 1); + cf3 = cf1.runAfterBoth(cf2, () -> { check(cf1.isDone()); check(cf2.isDone()); atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + CompletableFuture cfa = supplyAsync(() -> 1); + CompletableFuture cfb = supplyAsync(() -> 1); + cf3 = cfa.runAfterBothAsync(cfb, () -> { check(cfa.isDone()); check(cfb.isDone()); atomicInt.incrementAndGet(); }); + checkCompletedNormally(cfa, 1); + checkCompletedNormally(cfb, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + CompletableFuture cfx = supplyAsync(() -> 1); + CompletableFuture cfy = supplyAsync(() -> 1); + cf3 = cfy.runAfterBothAsync(cfx, () -> { check(cfx.isDone()); check(cfy.isDone()); atomicInt.incrementAndGet(); }, executor); + checkCompletedNormally(cfx, 1); + checkCompletedNormally(cfy, 1); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + CompletableFuture cf4 = supplyAsync(() -> { throw new RuntimeException(); }); + CompletableFuture cf5 = supplyAsync(() -> 1); + cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); }, executor); + checkCompletedExceptionally(cf4); + checkCompletedNormally(cf5, 1); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + + before = atomicInt.get(); + cf4 = supplyAsync(() -> 1); + cf5 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); }); + checkCompletedNormally(cf4, 1); + checkCompletedExceptionally(cf5); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + + before = atomicInt.get(); + cf4 = supplyAsync(() -> { throw new RuntimeException(); }); + cf5 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf5.runAfterBoth(cf4, () -> { atomicInt.incrementAndGet(); }); + checkCompletedExceptionally(cf4); + checkCompletedExceptionally(cf5); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // applyToEitherXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + CompletableFuture cf1 = supplyAsync(() -> 1); + CompletableFuture cf2 = supplyAsync(() -> 2); + cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 1 || x == 2); return x; }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, new Object[] {1, 2}); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 2); + cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, new Object[] {1, 2}); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 2); + cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; }, executor); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, new Object[] {1, 2}); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> 2); + cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 2); return x; }); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1); return x; }); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.applyToEitherAsync(cf2, (x) -> { fail(); return x; }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedExceptionally(cf3); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // acceptEitherXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + int before = atomicInt.get(); + CompletableFuture cf1 = supplyAsync(() -> 1); + CompletableFuture cf2 = supplyAsync(() -> 2); + cf3 = cf1.acceptEither(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 2); + cf3 = cf1.acceptEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> 2); + cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }, executor); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> 2); + cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 2); }, executor); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + + cf1 = supplyAsync(() -> 1); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1); }); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = supplyAsync(() -> { throw new RuntimeException(); }); + cf3 = cf2.acceptEitherAsync(cf1, (x) -> { fail(); }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedExceptionally(cf3); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // runAfterEitherXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + int before = atomicInt.get(); + CompletableFuture cf1 = runAsync(() -> { }); + CompletableFuture cf2 = runAsync(() -> { }); + cf3 = cf1.runAfterEither(cf2, () -> { atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = runAsync(() -> { }); + cf2 = runAsync(() -> { }); + cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = runAsync(() -> { }); + cf2 = runAsync(() -> { }); + cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); + check(cf1.isDone() || cf2.isDone()); + checkCompletedNormally(cf3, null); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = runAsync(() -> { throw new RuntimeException(); }); + cf2 = runAsync(() -> { }); + cf3 = cf2.runAfterEither(cf1, () -> { atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = runAsync(() -> { }); + cf2 = runAsync(() -> { throw new RuntimeException(); }); + cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); + check(cf1.isDone() || cf2.isDone()); + try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } + check(cf3.isDone()); + check(atomicInt.get() == (before + 1)); + + before = atomicInt.get(); + cf1 = runAsync(() -> { throw new RuntimeException(); }); + cf2 = runAsync(() -> { throw new RuntimeException(); }); + cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); + check(cf1.isDone() || cf2.isDone()); + checkCompletedExceptionally(cf3); + check(atomicInt.get() == before); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // thenComposeXXX tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + CompletableFuture cf1 = supplyAsync(() -> 1); + cf2 = cf1.thenCompose((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 2); + + cf1 = supplyAsync(() -> 1); + cf2 = cf1.thenComposeAsync((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); }); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 2); + + cf1 = supplyAsync(() -> 1); + cf2 = cf1.thenComposeAsync((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); }, executor); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 2); + + int before = atomicInt.get(); + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenCompose((x) -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); }); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> { throw new RuntimeException(); }); + cf2 = cf1.thenComposeAsync((x) -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); }); + checkCompletedExceptionally(cf1); + checkCompletedExceptionally(cf2); + check(atomicInt.get() == before); + + cf1 = supplyAsync(() -> 1); + cf2 = cf1.thenComposeAsync((x) -> { throw new RuntimeException(); }, executor); + checkCompletedNormally(cf1, 1); + checkCompletedExceptionally(cf2); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // anyOf tests + //---------------------------------------------------------------- + //try { + // CompletableFuture cf3; + // for (int k=0; k < 10; k++){ + // CompletableFuture cf1 = supplyAsync(() -> 1); + // CompletableFuture cf2 = supplyAsync(() -> 2); + // cf3 = CompletableFuture.anyOf(cf1, cf2); + // check(cf1.isDone() || cf2.isDone()); + // checkCompletedNormally(cf3, new Object[] {1, 2}); + // } + //} catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // allOf tests + //---------------------------------------------------------------- + try { + CompletableFuture cf3; + for (int k=0; k < 10; k++){ + CompletableFuture[] cfs = (CompletableFuture[]) + Array.newInstance(CompletableFuture.class, 10); + for (int j=0; j < 10; j++) { + final int v = j; + cfs[j] = supplyAsync(() -> v); + } + cf3 = CompletableFuture.allOf(cfs); + for (int j=0; j < 10; j++) + checkCompletedNormally(cfs[j], j); + checkCompletedNormally(cf3, null); + } + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // exceptionally tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + CompletableFuture cf1 = supplyAsync(() -> 1); + cf2 = cf1.exceptionally((t) -> { fail("function should never be called"); return 2;}); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 1); + + final RuntimeException t = new RuntimeException(); + cf1 = supplyAsync(() -> { throw t; }); + cf2 = cf1.exceptionally((x) -> { check(x.getCause() == t); return 2;}); + checkCompletedExceptionally(cf1); + checkCompletedNormally(cf2, 2); + } catch (Throwable t) { unexpected(t); } + + //---------------------------------------------------------------- + // handle tests + //---------------------------------------------------------------- + try { + CompletableFuture cf2; + CompletableFuture cf1 = supplyAsync(() -> 1); + cf2 = cf1.handle((x,t) -> x+1); + checkCompletedNormally(cf1, 1); + checkCompletedNormally(cf2, 2); + + final RuntimeException ex = new RuntimeException(); + cf1 = supplyAsync(() -> { throw ex; }); + cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;}); + checkCompletedExceptionally(cf1); + checkCompletedNormally(cf2, 2); + } catch (Throwable t) { unexpected(t); } + + } + + //--------------------- Infrastructure --------------------------- + static volatile int passed = 0, failed = 0; + static void pass() {passed++;} + static void fail() {failed++; Thread.dumpStack();} + static void fail(String msg) {System.out.println(msg); fail();} + static void unexpected(Throwable t) {failed++; t.printStackTrace();} + static void check(boolean cond) {if (cond) pass(); else fail();} + static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);} + static void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + static void equalAnyOf(Object x, Object[] y) { + if (x == null && y == null) { pass(); return; } + for (Object z : y) { if (x.equals(z)) { pass(); return; } } + StringBuilder sb = new StringBuilder(); + for (Object o : y) + sb.append(o).append(" "); + fail(x + " not equal to one of [" + sb + "]");} + public static void main(String[] args) throws Throwable { + try {realMain(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +}