jdk/test/java/util/concurrent/CompletableFuture/Basic.java
changeset 16745 fcce5e09e23b
child 16861 9dd215f8cc5a
equal deleted inserted replaced
16744:b3ca7ed8e44f 16745:fcce5e09e23b
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.
       
     7  *
       
     8  * This code is distributed in the hope that it will be useful, but WITHOUT
       
     9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    11  * version 2 for more details (a copy is included in the LICENSE file that
       
    12  * accompanied this code).
       
    13  *
       
    14  * You should have received a copy of the GNU General Public License version
       
    15  * 2 along with this work; if not, write to the Free Software Foundation,
       
    16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    17  *
       
    18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    19  * or visit www.oracle.com if you need additional information or have any
       
    20  * questions.
       
    21  */
       
    22 
       
    23 /*
       
    24  * This file is available under and governed by the GNU General Public
       
    25  * License version 2 only, as published by the Free Software Foundation.
       
    26  * However, the following notice accompanied the original version of this
       
    27  * file:
       
    28  *
       
    29  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    30  * Expert Group and released to the public domain, as explained at
       
    31  * http://creativecommons.org/publicdomain/zero/1.0/
       
    32  */
       
    33 
       
    34 /*
       
    35  * @test
       
    36  * @bug 8005696
       
    37  * @summary Basic tests for CompletableFuture
       
    38  * @author Chris Hegarty
       
    39  */
       
    40 
       
    41 import java.lang.reflect.Array;
       
    42 import java.util.concurrent.Phaser;
       
    43 import static java.util.concurrent.TimeUnit.*;
       
    44 import java.util.concurrent.CompletableFuture;
       
    45 import static java.util.concurrent.CompletableFuture.*;
       
    46 import java.util.concurrent.CompletionException;
       
    47 import java.util.concurrent.CancellationException;
       
    48 import java.util.concurrent.ExecutionException;
       
    49 import java.util.concurrent.ExecutorService;
       
    50 import java.util.concurrent.Executors;
       
    51 import static java.util.concurrent.ForkJoinPool.*;
       
    52 import java.util.concurrent.atomic.AtomicInteger;
       
    53 
       
    54 
       
    55 public class Basic {
       
    56 
       
    57     static void checkCompletedNormally(CompletableFuture<?> cf, Object value) {
       
    58         checkCompletedNormally(cf, value == null ? null : new Object[] { value });
       
    59     }
       
    60 
       
    61     static void checkCompletedNormally(CompletableFuture<?> cf, Object[] values) {
       
    62         try { equalAnyOf(cf.join(), values); } catch (Throwable x) { unexpected(x); }
       
    63         try { equalAnyOf(cf.getNow(null), values); } catch (Throwable x) { unexpected(x); }
       
    64         try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); }
       
    65         try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); }
       
    66         check(cf.isDone(), "Expected isDone to be true, got:" + cf);
       
    67         check(!cf.isCancelled(), "Expected isCancelled to be false");
       
    68         check(!cf.cancel(true), "Expected cancel to return false");
       
    69         check(cf.toString().contains("[Completed normally]"));
       
    70         check(cf.complete(null) == false, "Expected complete() to fail");
       
    71         check(cf.completeExceptionally(new Throwable()) == false,
       
    72               "Expected completeExceptionally() to fail");
       
    73     }
       
    74 
       
    75     static <T> void checkCompletedExceptionally(CompletableFuture<T> cf)
       
    76         throws Exception
       
    77     {
       
    78         checkCompletedExceptionally(cf, false);
       
    79     }
       
    80 
       
    81     @SuppressWarnings("unchecked")
       
    82     static <T> void checkCompletedExceptionally(CompletableFuture<T> cf, boolean cancelled)
       
    83         throws Exception
       
    84     {
       
    85         try { cf.join(); fail("Excepted exception to be thrown"); }
       
    86         catch (CompletionException x) { if (cancelled) fail(); else pass(); }
       
    87         catch (CancellationException x) { if (cancelled) pass(); else fail(); }
       
    88         try { cf.getNow(null); fail("Excepted exception to be thrown"); }
       
    89         catch (CompletionException x) { if (cancelled) fail(); else pass(); }
       
    90         catch (CancellationException x) { if (cancelled) pass(); else fail(); }
       
    91         try { cf.get(); fail("Excepted exception to be thrown");}
       
    92         catch (CancellationException x) { if (cancelled) pass(); else fail(); }
       
    93         catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); }
       
    94         try { cf.get(0L, SECONDS); fail("Excepted exception to be thrown");}
       
    95         catch (CancellationException x) { if (cancelled) pass(); else fail(); }
       
    96         catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); }
       
    97         check(cf.isDone(), "Expected isDone to be true, got:" + cf);
       
    98         check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:"  + cf.isCancelled());
       
    99         check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:"  + cf.cancel(true));
       
   100         check(cf.toString().contains("[Completed exceptionally]"));  // ## TODO: 'E'xceptionally
       
   101         check(cf.complete((T)new Object()) == false, "Expected complete() to fail");
       
   102         check(cf.completeExceptionally(new Throwable()) == false,
       
   103               "Expected completeExceptionally() to fail, already completed");
       
   104     }
       
   105 
       
   106     private static void realMain(String[] args) throws Throwable {
       
   107         ExecutorService executor = Executors.newFixedThreadPool(2);
       
   108         try {
       
   109             test(executor);
       
   110         } finally {
       
   111             executor.shutdown();
       
   112             executor.awaitTermination(30, SECONDS);
       
   113         }
       
   114     }
       
   115 
       
   116     static AtomicInteger atomicInt = new AtomicInteger(0);
       
   117 
       
   118     private static void test(ExecutorService executor) throws Throwable {
       
   119 
       
   120         Thread.currentThread().setName("mainThread");
       
   121 
       
   122         //----------------------------------------------------------------
       
   123         // supplyAsync tests
       
   124         //----------------------------------------------------------------
       
   125         try {
       
   126             CompletableFuture<String> cf = supplyAsync(() -> "a test string");
       
   127             checkCompletedNormally(cf, cf.join());
       
   128             cf = supplyAsync(() -> "a test string", commonPool());
       
   129             checkCompletedNormally(cf, cf.join());
       
   130             cf = supplyAsync(() -> "a test string", executor);
       
   131             checkCompletedNormally(cf, cf.join());
       
   132             cf = supplyAsync(() -> { throw new RuntimeException(); });
       
   133             checkCompletedExceptionally(cf);
       
   134             cf = supplyAsync(() -> { throw new RuntimeException(); }, commonPool());
       
   135             checkCompletedExceptionally(cf);
       
   136             cf = supplyAsync(() -> { throw new RuntimeException(); }, executor);
       
   137             checkCompletedExceptionally(cf);
       
   138         } catch (Throwable t) { unexpected(t); }
       
   139 
       
   140         //----------------------------------------------------------------
       
   141         // runAsync tests
       
   142         //----------------------------------------------------------------
       
   143         try {
       
   144             CompletableFuture<Void> cf = runAsync(() -> { });
       
   145             checkCompletedNormally(cf, cf.join());
       
   146             cf = runAsync(() -> { }, commonPool());
       
   147             checkCompletedNormally(cf, cf.join());
       
   148             cf = runAsync(() -> { }, executor);
       
   149             checkCompletedNormally(cf, cf.join());
       
   150             cf = runAsync(() -> { throw new RuntimeException(); });
       
   151             checkCompletedExceptionally(cf);
       
   152             cf = runAsync(() -> { throw new RuntimeException(); }, commonPool());
       
   153             checkCompletedExceptionally(cf);
       
   154             cf = runAsync(() -> { throw new RuntimeException(); }, executor);
       
   155             checkCompletedExceptionally(cf);
       
   156         } catch (Throwable t) { unexpected(t); }
       
   157 
       
   158         //----------------------------------------------------------------
       
   159         // explicit completion
       
   160         //----------------------------------------------------------------
       
   161         try {
       
   162             final Phaser phaser = new Phaser(1);
       
   163             final int phase = phaser.getPhase();
       
   164             CompletableFuture<Integer> cf;
       
   165             cf = supplyAsync(() -> { phaser.awaitAdvance(phase); return 1; });
       
   166             cf.complete(2);
       
   167             phaser.arrive();
       
   168             checkCompletedNormally(cf, 2);
       
   169 
       
   170             cf = supplyAsync(() -> { phaser.awaitAdvance(phase+1); return 1; });
       
   171             cf.completeExceptionally(new Throwable());
       
   172             phaser.arrive();
       
   173             checkCompletedExceptionally(cf);
       
   174 
       
   175             cf = supplyAsync(() -> { phaser.awaitAdvance(phase+2); return 1; });
       
   176             cf.cancel(true);
       
   177             phaser.arrive();
       
   178             checkCompletedExceptionally(cf, true);
       
   179 
       
   180             cf = supplyAsync(() -> { phaser.awaitAdvance(phase+3); return 1; });
       
   181             check(cf.getNow(2) == 2);
       
   182             phaser.arrive();
       
   183             checkCompletedNormally(cf, 1);
       
   184             check(cf.getNow(2) == 1);
       
   185         } catch (Throwable t) { unexpected(t); }
       
   186 
       
   187         //----------------------------------------------------------------
       
   188         // thenApplyXXX tests
       
   189         //----------------------------------------------------------------
       
   190         try {
       
   191             CompletableFuture<Integer> cf2;
       
   192             CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
       
   193             cf2 = cf1.thenApply((x) -> { if (x.equals("a test string")) return 1; else return 0; });
       
   194             checkCompletedNormally(cf1, "a test string");
       
   195             checkCompletedNormally(cf2, 1);
       
   196 
       
   197             cf1 = supplyAsync(() -> "a test string");
       
   198             cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; });
       
   199             checkCompletedNormally(cf1, "a test string");
       
   200             checkCompletedNormally(cf2, 1);
       
   201 
       
   202             cf1 = supplyAsync(() -> "a test string");
       
   203             cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; }, executor);
       
   204             checkCompletedNormally(cf1, "a test string");
       
   205             checkCompletedNormally(cf2, 1);
       
   206 
       
   207             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   208             cf2 = cf1.thenApply((x) -> { return 0; } );
       
   209             checkCompletedExceptionally(cf1);
       
   210             checkCompletedExceptionally(cf2);
       
   211 
       
   212             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   213             cf2 = cf1.thenApplyAsync((x) -> { return 0; } );
       
   214             checkCompletedExceptionally(cf1);
       
   215             checkCompletedExceptionally(cf2);
       
   216 
       
   217             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   218             cf2 = cf1.thenApplyAsync((x) -> { return 0; }, executor);
       
   219             checkCompletedExceptionally(cf1);
       
   220             checkCompletedExceptionally(cf2);
       
   221         } catch (Throwable t) { unexpected(t); }
       
   222 
       
   223         //----------------------------------------------------------------
       
   224         // thenAcceptXXX tests
       
   225         //----------------------------------------------------------------
       
   226         try {
       
   227             CompletableFuture<Void> cf2;
       
   228             int before = atomicInt.get();
       
   229             CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
       
   230             cf2 = cf1.thenAccept((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); });
       
   231             checkCompletedNormally(cf1, "a test string");
       
   232             checkCompletedNormally(cf2, null);
       
   233             check(atomicInt.get() == (before + 1));
       
   234 
       
   235             before = atomicInt.get();
       
   236             cf1 = supplyAsync(() -> "a test string");
       
   237             cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); });
       
   238             checkCompletedNormally(cf1, "a test string");
       
   239             checkCompletedNormally(cf2, null);
       
   240             check(atomicInt.get() == (before + 1));
       
   241 
       
   242             before = atomicInt.get();
       
   243             cf1 = supplyAsync(() -> "a test string");
       
   244             cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }, executor);
       
   245             checkCompletedNormally(cf1, "a test string");
       
   246             checkCompletedNormally(cf2, null);
       
   247             check(atomicInt.get() == (before + 1));
       
   248 
       
   249             before = atomicInt.get();
       
   250             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   251             cf2 = cf1.thenAccept((x) -> { atomicInt.incrementAndGet(); } );
       
   252             checkCompletedExceptionally(cf1);
       
   253             checkCompletedExceptionally(cf2);
       
   254             check(atomicInt.get() == before);
       
   255 
       
   256             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   257             cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); } );
       
   258             checkCompletedExceptionally(cf1);
       
   259             checkCompletedExceptionally(cf2);
       
   260             check(atomicInt.get() == before);
       
   261 
       
   262             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   263             cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); }, executor );
       
   264             checkCompletedExceptionally(cf1);
       
   265             checkCompletedExceptionally(cf2);
       
   266             check(atomicInt.get() == before);
       
   267         } catch (Throwable t) { unexpected(t); }
       
   268 
       
   269         //----------------------------------------------------------------
       
   270         // thenRunXXX tests
       
   271         //----------------------------------------------------------------
       
   272         try {
       
   273             CompletableFuture<Void> cf2;
       
   274             int before = atomicInt.get();
       
   275             CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
       
   276             cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); });
       
   277             checkCompletedNormally(cf1, "a test string");
       
   278             checkCompletedNormally(cf2, null);
       
   279             check(atomicInt.get() == (before + 1));
       
   280 
       
   281             before = atomicInt.get();
       
   282             cf1 = supplyAsync(() -> "a test string");
       
   283             cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); });
       
   284             checkCompletedNormally(cf1, "a test string");
       
   285             checkCompletedNormally(cf2, null);
       
   286             check(atomicInt.get() == (before + 1));
       
   287 
       
   288             before = atomicInt.get();
       
   289             cf1 = supplyAsync(() -> "a test string");
       
   290             cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor);
       
   291             checkCompletedNormally(cf1, "a test string");
       
   292             checkCompletedNormally(cf2, null);
       
   293             check(atomicInt.get() == (before + 1));
       
   294 
       
   295             before = atomicInt.get();
       
   296             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   297             cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); });
       
   298             checkCompletedExceptionally(cf1);
       
   299             checkCompletedExceptionally(cf2);
       
   300             check(atomicInt.get() == before);
       
   301 
       
   302             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   303             cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); });
       
   304             checkCompletedExceptionally(cf1);
       
   305             checkCompletedExceptionally(cf2);
       
   306             check(atomicInt.get() == before);
       
   307 
       
   308             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   309             cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor);
       
   310             checkCompletedExceptionally(cf1);
       
   311             checkCompletedExceptionally(cf2);
       
   312             check(atomicInt.get() == before);
       
   313         } catch (Throwable t) { unexpected(t); }
       
   314 
       
   315         //----------------------------------------------------------------
       
   316         // thenCombineXXX tests
       
   317         //----------------------------------------------------------------
       
   318         try {
       
   319             CompletableFuture<Integer> cf3;
       
   320             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   321             CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
       
   322             cf3 = cf1.thenCombine(cf2, (x, y) -> { return x + y; });
       
   323             checkCompletedNormally(cf1, 1);
       
   324             checkCompletedNormally(cf2, 1);
       
   325             checkCompletedNormally(cf3, 2);
       
   326 
       
   327             cf1 = supplyAsync(() -> 1);
       
   328             cf2 = supplyAsync(() -> 1);
       
   329             cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; });
       
   330             checkCompletedNormally(cf1, 1);
       
   331             checkCompletedNormally(cf2, 1);
       
   332             checkCompletedNormally(cf3, 2);
       
   333 
       
   334             cf1 = supplyAsync(() -> 1);
       
   335             cf2 = supplyAsync(() -> 1);
       
   336             cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; }, executor);
       
   337             checkCompletedNormally(cf1, 1);
       
   338             checkCompletedNormally(cf2, 1);
       
   339             checkCompletedNormally(cf3, 2);
       
   340 
       
   341             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   342             cf2 = supplyAsync(() -> 1);
       
   343             cf3 = cf1.thenCombine(cf2, (x, y) -> { return 0; });
       
   344             checkCompletedExceptionally(cf1);
       
   345             checkCompletedNormally(cf2, 1);
       
   346             checkCompletedExceptionally(cf3);
       
   347 
       
   348             cf1 = supplyAsync(() -> 1);
       
   349             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   350             cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; });
       
   351             checkCompletedNormally(cf1, 1);
       
   352             checkCompletedExceptionally(cf2);
       
   353             checkCompletedExceptionally(cf3);
       
   354 
       
   355             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   356             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   357             cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; }, executor);
       
   358             checkCompletedExceptionally(cf1);
       
   359             checkCompletedExceptionally(cf2);
       
   360             checkCompletedExceptionally(cf3);
       
   361         } catch (Throwable t) { unexpected(t); }
       
   362 
       
   363         //----------------------------------------------------------------
       
   364         // thenAcceptBothXXX tests
       
   365         //----------------------------------------------------------------
       
   366         try {
       
   367             CompletableFuture<Void> cf3;
       
   368             int before = atomicInt.get();
       
   369             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   370             CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
       
   371             cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); });
       
   372             checkCompletedNormally(cf1, 1);
       
   373             checkCompletedNormally(cf2, 1);
       
   374             checkCompletedNormally(cf3, null);
       
   375             check(atomicInt.get() == (before + 1));
       
   376 
       
   377             before = atomicInt.get();
       
   378             cf1 = supplyAsync(() -> 1);
       
   379             cf2 = supplyAsync(() -> 1);
       
   380             cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); });
       
   381             checkCompletedNormally(cf1, 1);
       
   382             checkCompletedNormally(cf2, 1);
       
   383             checkCompletedNormally(cf3, null);
       
   384             check(atomicInt.get() == (before + 1));
       
   385 
       
   386             before = atomicInt.get();
       
   387             cf1 = supplyAsync(() -> 1);
       
   388             cf2 = supplyAsync(() -> 1);
       
   389             cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }, executor);
       
   390             checkCompletedNormally(cf1, 1);
       
   391             checkCompletedNormally(cf2, 1);
       
   392             checkCompletedNormally(cf3, null);
       
   393             check(atomicInt.get() == (before + 1));
       
   394 
       
   395             before = atomicInt.get();
       
   396             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   397             cf2 = supplyAsync(() -> 1);
       
   398             cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { atomicInt.incrementAndGet(); });
       
   399             checkCompletedExceptionally(cf1);
       
   400             checkCompletedNormally(cf2, 1);
       
   401             checkCompletedExceptionally(cf3);
       
   402             check(atomicInt.get() == before);
       
   403 
       
   404             cf1 = supplyAsync(() -> 1);
       
   405             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   406             cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); });
       
   407             checkCompletedNormally(cf1, 1);
       
   408             checkCompletedExceptionally(cf2);
       
   409             checkCompletedExceptionally(cf3);
       
   410             check(atomicInt.get() == before);
       
   411 
       
   412             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   413             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   414             cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); }, executor);
       
   415             checkCompletedExceptionally(cf1);
       
   416             checkCompletedExceptionally(cf2);
       
   417             checkCompletedExceptionally(cf3);
       
   418             check(atomicInt.get() == before);
       
   419         } catch (Throwable t) { unexpected(t); }
       
   420 
       
   421         //----------------------------------------------------------------
       
   422         // runAfterBothXXX tests
       
   423         //----------------------------------------------------------------
       
   424         try {
       
   425             CompletableFuture<Void> cf3;
       
   426             int before = atomicInt.get();
       
   427             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   428             CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
       
   429             cf3 = cf1.runAfterBoth(cf2, () -> { check(cf1.isDone()); check(cf2.isDone()); atomicInt.incrementAndGet(); });
       
   430             checkCompletedNormally(cf1, 1);
       
   431             checkCompletedNormally(cf2, 1);
       
   432             checkCompletedNormally(cf3, null);
       
   433             check(atomicInt.get() == (before + 1));
       
   434 
       
   435             before = atomicInt.get();
       
   436             CompletableFuture<Integer> cfa = supplyAsync(() -> 1);
       
   437             CompletableFuture<Integer> cfb = supplyAsync(() -> 1);
       
   438             cf3 = cfa.runAfterBothAsync(cfb, () -> { check(cfa.isDone()); check(cfb.isDone()); atomicInt.incrementAndGet(); });
       
   439             checkCompletedNormally(cfa, 1);
       
   440             checkCompletedNormally(cfb, 1);
       
   441             checkCompletedNormally(cf3, null);
       
   442             check(atomicInt.get() == (before + 1));
       
   443 
       
   444             before = atomicInt.get();
       
   445             CompletableFuture<Integer> cfx = supplyAsync(() -> 1);
       
   446             CompletableFuture<Integer> cfy = supplyAsync(() -> 1);
       
   447             cf3 = cfy.runAfterBothAsync(cfx, () -> { check(cfx.isDone()); check(cfy.isDone()); atomicInt.incrementAndGet(); }, executor);
       
   448             checkCompletedNormally(cfx, 1);
       
   449             checkCompletedNormally(cfy, 1);
       
   450             checkCompletedNormally(cf3, null);
       
   451             check(atomicInt.get() == (before + 1));
       
   452 
       
   453             before = atomicInt.get();
       
   454             CompletableFuture<Integer> cf4 = supplyAsync(() -> { throw new RuntimeException(); });
       
   455             CompletableFuture<Integer> cf5 = supplyAsync(() -> 1);
       
   456             cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); }, executor);
       
   457             checkCompletedExceptionally(cf4);
       
   458             checkCompletedNormally(cf5, 1);
       
   459             checkCompletedExceptionally(cf3);
       
   460             check(atomicInt.get() == before);
       
   461 
       
   462             before = atomicInt.get();
       
   463             cf4 = supplyAsync(() -> 1);
       
   464             cf5 = supplyAsync(() -> { throw new RuntimeException(); });
       
   465             cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); });
       
   466             checkCompletedNormally(cf4, 1);
       
   467             checkCompletedExceptionally(cf5);
       
   468             checkCompletedExceptionally(cf3);
       
   469             check(atomicInt.get() == before);
       
   470 
       
   471             before = atomicInt.get();
       
   472             cf4 = supplyAsync(() -> { throw new RuntimeException(); });
       
   473             cf5 = supplyAsync(() -> { throw new RuntimeException(); });
       
   474             cf3 = cf5.runAfterBoth(cf4, () -> { atomicInt.incrementAndGet(); });
       
   475             checkCompletedExceptionally(cf4);
       
   476             checkCompletedExceptionally(cf5);
       
   477             checkCompletedExceptionally(cf3);
       
   478             check(atomicInt.get() == before);
       
   479         } catch (Throwable t) { unexpected(t); }
       
   480 
       
   481         //----------------------------------------------------------------
       
   482         // applyToEitherXXX tests
       
   483         //----------------------------------------------------------------
       
   484         try {
       
   485             CompletableFuture<Integer> cf3;
       
   486             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   487             CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
       
   488             cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 1 || x == 2); return x; });
       
   489             check(cf1.isDone() || cf2.isDone());
       
   490             checkCompletedNormally(cf3, new Object[] {1, 2});
       
   491 
       
   492             cf1 = supplyAsync(() -> 1);
       
   493             cf2 = supplyAsync(() -> 2);
       
   494             cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; });
       
   495             check(cf1.isDone() || cf2.isDone());
       
   496             checkCompletedNormally(cf3, new Object[] {1, 2});
       
   497 
       
   498             cf1 = supplyAsync(() -> 1);
       
   499             cf2 = supplyAsync(() -> 2);
       
   500             cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; }, executor);
       
   501             check(cf1.isDone() || cf2.isDone());
       
   502             checkCompletedNormally(cf3, new Object[] {1, 2});
       
   503 
       
   504             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   505             cf2 = supplyAsync(() -> 2);
       
   506             cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 2); return x; });
       
   507             check(cf1.isDone() || cf2.isDone());
       
   508             try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); }
       
   509             check(cf3.isDone());
       
   510 
       
   511             cf1 = supplyAsync(() -> 1);
       
   512             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   513             cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1); return x; });
       
   514             check(cf1.isDone() || cf2.isDone());
       
   515             try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); }
       
   516             check(cf3.isDone());
       
   517 
       
   518             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   519             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   520             cf3 = cf1.applyToEitherAsync(cf2, (x) -> { fail(); return x; });
       
   521             check(cf1.isDone() || cf2.isDone());
       
   522             checkCompletedExceptionally(cf3);
       
   523         } catch (Throwable t) { unexpected(t); }
       
   524 
       
   525         //----------------------------------------------------------------
       
   526         // acceptEitherXXX tests
       
   527         //----------------------------------------------------------------
       
   528         try {
       
   529             CompletableFuture<Void> cf3;
       
   530             int before = atomicInt.get();
       
   531             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   532             CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
       
   533             cf3 = cf1.acceptEither(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); });
       
   534             check(cf1.isDone() || cf2.isDone());
       
   535             checkCompletedNormally(cf3, null);
       
   536             check(atomicInt.get() == (before + 1));
       
   537 
       
   538             before = atomicInt.get();
       
   539             cf1 = supplyAsync(() -> 1);
       
   540             cf2 = supplyAsync(() -> 2);
       
   541             cf3 = cf1.acceptEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); });
       
   542             check(cf1.isDone() || cf2.isDone());
       
   543             checkCompletedNormally(cf3, null);
       
   544             check(atomicInt.get() == (before + 1));
       
   545 
       
   546             before = atomicInt.get();
       
   547             cf1 = supplyAsync(() -> 1);
       
   548             cf2 = supplyAsync(() -> 2);
       
   549             cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }, executor);
       
   550             check(cf1.isDone() || cf2.isDone());
       
   551             checkCompletedNormally(cf3, null);
       
   552             check(atomicInt.get() == (before + 1));
       
   553 
       
   554             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   555             cf2 = supplyAsync(() -> 2);
       
   556             cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 2); }, executor);
       
   557             check(cf1.isDone() || cf2.isDone());
       
   558             try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
       
   559             check(cf3.isDone());
       
   560 
       
   561             cf1 = supplyAsync(() -> 1);
       
   562             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   563             cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1); });
       
   564             check(cf1.isDone() || cf2.isDone());
       
   565             try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
       
   566             check(cf3.isDone());
       
   567 
       
   568             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   569             cf2 = supplyAsync(() -> { throw new RuntimeException(); });
       
   570             cf3 = cf2.acceptEitherAsync(cf1, (x) -> { fail(); });
       
   571             check(cf1.isDone() || cf2.isDone());
       
   572             checkCompletedExceptionally(cf3);
       
   573         } catch (Throwable t) { unexpected(t); }
       
   574 
       
   575         //----------------------------------------------------------------
       
   576         // runAfterEitherXXX tests
       
   577         //----------------------------------------------------------------
       
   578         try {
       
   579             CompletableFuture<Void> cf3;
       
   580             int before = atomicInt.get();
       
   581             CompletableFuture<Void> cf1 = runAsync(() -> { });
       
   582             CompletableFuture<Void> cf2 = runAsync(() -> { });
       
   583             cf3 = cf1.runAfterEither(cf2, () -> { atomicInt.incrementAndGet(); });
       
   584             check(cf1.isDone() || cf2.isDone());
       
   585             checkCompletedNormally(cf3, null);
       
   586             check(atomicInt.get() == (before + 1));
       
   587 
       
   588             before = atomicInt.get();
       
   589             cf1 = runAsync(() -> { });
       
   590             cf2 = runAsync(() -> { });
       
   591             cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); });
       
   592             check(cf1.isDone() || cf2.isDone());
       
   593             checkCompletedNormally(cf3, null);
       
   594             check(atomicInt.get() == (before + 1));
       
   595 
       
   596             before = atomicInt.get();
       
   597             cf1 = runAsync(() -> { });
       
   598             cf2 = runAsync(() -> { });
       
   599             cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor);
       
   600             check(cf1.isDone() || cf2.isDone());
       
   601             checkCompletedNormally(cf3, null);
       
   602             check(atomicInt.get() == (before + 1));
       
   603 
       
   604             before = atomicInt.get();
       
   605             cf1 = runAsync(() -> { throw new RuntimeException(); });
       
   606             cf2 = runAsync(() -> { });
       
   607             cf3 = cf2.runAfterEither(cf1, () -> { atomicInt.incrementAndGet(); });
       
   608             check(cf1.isDone() || cf2.isDone());
       
   609             try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
       
   610             check(cf3.isDone());
       
   611             check(atomicInt.get() == (before + 1));
       
   612 
       
   613             before = atomicInt.get();
       
   614             cf1 = runAsync(() -> { });
       
   615             cf2 = runAsync(() -> { throw new RuntimeException(); });
       
   616             cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); });
       
   617             check(cf1.isDone() || cf2.isDone());
       
   618             try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
       
   619             check(cf3.isDone());
       
   620             check(atomicInt.get() == (before + 1));
       
   621 
       
   622             before = atomicInt.get();
       
   623             cf1 = runAsync(() -> { throw new RuntimeException(); });
       
   624             cf2 = runAsync(() -> { throw new RuntimeException(); });
       
   625             cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor);
       
   626             check(cf1.isDone() || cf2.isDone());
       
   627             checkCompletedExceptionally(cf3);
       
   628             check(atomicInt.get() == before);
       
   629         } catch (Throwable t) { unexpected(t); }
       
   630 
       
   631         //----------------------------------------------------------------
       
   632         // thenComposeXXX tests
       
   633         //----------------------------------------------------------------
       
   634         try {
       
   635             CompletableFuture<Integer> cf2;
       
   636             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   637             cf2 = cf1.thenCompose((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); });
       
   638             checkCompletedNormally(cf1, 1);
       
   639             checkCompletedNormally(cf2, 2);
       
   640 
       
   641             cf1 = supplyAsync(() -> 1);
       
   642             cf2 = cf1.thenComposeAsync((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); });
       
   643             checkCompletedNormally(cf1, 1);
       
   644             checkCompletedNormally(cf2, 2);
       
   645 
       
   646             cf1 = supplyAsync(() -> 1);
       
   647             cf2 = cf1.thenComposeAsync((x) -> { check(x ==1); return CompletableFuture.completedFuture(2); }, executor);
       
   648             checkCompletedNormally(cf1, 1);
       
   649             checkCompletedNormally(cf2, 2);
       
   650 
       
   651             int before = atomicInt.get();
       
   652             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   653             cf2 = cf1.thenCompose((x) -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); });
       
   654             checkCompletedExceptionally(cf1);
       
   655             checkCompletedExceptionally(cf2);
       
   656             check(atomicInt.get() == before);
       
   657 
       
   658             cf1 = supplyAsync(() -> { throw new RuntimeException(); });
       
   659             cf2 = cf1.thenComposeAsync((x) -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); });
       
   660             checkCompletedExceptionally(cf1);
       
   661             checkCompletedExceptionally(cf2);
       
   662             check(atomicInt.get() == before);
       
   663 
       
   664             cf1 = supplyAsync(() -> 1);
       
   665             cf2 = cf1.thenComposeAsync((x) -> { throw new RuntimeException(); }, executor);
       
   666             checkCompletedNormally(cf1, 1);
       
   667             checkCompletedExceptionally(cf2);
       
   668         } catch (Throwable t) { unexpected(t); }
       
   669 
       
   670         //----------------------------------------------------------------
       
   671         // anyOf tests
       
   672         //----------------------------------------------------------------
       
   673         //try {
       
   674         //    CompletableFuture<Object> cf3;
       
   675         //    for (int k=0; k < 10; k++){
       
   676         //        CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   677         //        CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
       
   678         //        cf3 = CompletableFuture.anyOf(cf1, cf2);
       
   679         //        check(cf1.isDone() || cf2.isDone());
       
   680         //        checkCompletedNormally(cf3, new Object[] {1, 2});
       
   681         //    }
       
   682         //} catch (Throwable t) { unexpected(t); }
       
   683 
       
   684         //----------------------------------------------------------------
       
   685         // allOf tests
       
   686         //----------------------------------------------------------------
       
   687         try {
       
   688             CompletableFuture<?> cf3;
       
   689             for (int k=0; k < 10; k++){
       
   690                 CompletableFuture<Integer>[] cfs = (CompletableFuture<Integer>[])
       
   691                         Array.newInstance(CompletableFuture.class, 10);
       
   692                 for (int j=0; j < 10; j++) {
       
   693                     final int v = j;
       
   694                     cfs[j] = supplyAsync(() -> v);
       
   695                 }
       
   696                 cf3 = CompletableFuture.allOf(cfs);
       
   697                 for (int j=0; j < 10; j++)
       
   698                     checkCompletedNormally(cfs[j], j);
       
   699                 checkCompletedNormally(cf3, null);
       
   700             }
       
   701         } catch (Throwable t) { unexpected(t); }
       
   702 
       
   703         //----------------------------------------------------------------
       
   704         // exceptionally tests
       
   705         //----------------------------------------------------------------
       
   706         try {
       
   707             CompletableFuture<Integer> cf2;
       
   708             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   709             cf2 = cf1.exceptionally((t) -> { fail("function should never be called"); return 2;});
       
   710             checkCompletedNormally(cf1, 1);
       
   711             checkCompletedNormally(cf2, 1);
       
   712 
       
   713             final RuntimeException t = new RuntimeException();
       
   714             cf1 = supplyAsync(() -> { throw t; });
       
   715             cf2 = cf1.exceptionally((x) -> { check(x.getCause() == t); return 2;});
       
   716             checkCompletedExceptionally(cf1);
       
   717             checkCompletedNormally(cf2, 2);
       
   718         } catch (Throwable t) { unexpected(t); }
       
   719 
       
   720         //----------------------------------------------------------------
       
   721         // handle tests
       
   722         //----------------------------------------------------------------
       
   723         try {
       
   724             CompletableFuture<Integer> cf2;
       
   725             CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
       
   726             cf2 = cf1.handle((x,t) -> x+1);
       
   727             checkCompletedNormally(cf1, 1);
       
   728             checkCompletedNormally(cf2, 2);
       
   729 
       
   730             final RuntimeException ex = new RuntimeException();
       
   731             cf1 = supplyAsync(() -> { throw ex; });
       
   732             cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;});
       
   733             checkCompletedExceptionally(cf1);
       
   734             checkCompletedNormally(cf2, 2);
       
   735         } catch (Throwable t) { unexpected(t); }
       
   736 
       
   737     }
       
   738 
       
   739     //--------------------- Infrastructure ---------------------------
       
   740     static volatile int passed = 0, failed = 0;
       
   741     static void pass() {passed++;}
       
   742     static void fail() {failed++; Thread.dumpStack();}
       
   743     static void fail(String msg) {System.out.println(msg); fail();}
       
   744     static void unexpected(Throwable t) {failed++; t.printStackTrace();}
       
   745     static void check(boolean cond) {if (cond) pass(); else fail();}
       
   746     static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);}
       
   747     static void equal(Object x, Object y) {
       
   748         if (x == null ? y == null : x.equals(y)) pass();
       
   749         else fail(x + " not equal to " + y);}
       
   750     static void equalAnyOf(Object x, Object[] y) {
       
   751         if (x == null && y == null) { pass(); return; }
       
   752         for (Object z : y) { if (x.equals(z)) { pass(); return; } }
       
   753         StringBuilder sb = new StringBuilder();
       
   754         for (Object o : y)
       
   755             sb.append(o).append(" ");
       
   756         fail(x + " not equal to one of [" + sb + "]");}
       
   757     public static void main(String[] args) throws Throwable {
       
   758         try {realMain(args);} catch (Throwable t) {unexpected(t);}
       
   759         System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
       
   760         if (failed > 0) throw new AssertionError("Some tests failed");}
       
   761 }