8166465: CompletableFuture.minimalCompletionStage().toCompletableFuture() should be non-minimal
authordl
Fri, 23 Sep 2016 13:14:14 -0700
changeset 41128 0dfc0bc2196c
parent 41127 186dce575463
child 41129 e54fb9880260
8166465: CompletableFuture.minimalCompletionStage().toCompletableFuture() should be non-minimal Reviewed-by: martin, chegar, shade
jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java
jdk/test/java/util/concurrent/tck/CompletableFutureTest.java
--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java	Fri Sep 23 12:08:38 2016 +0300
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java	Fri Sep 23 13:14:14 2016 -0700
@@ -2559,6 +2559,13 @@
      * exceptionally with a CompletionException with this exception as
      * cause.
      *
+     * <p>Unless overridden by a subclass, a new non-minimal
+     * CompletableFuture with all methods available can be obtained from
+     * a minimal CompletionStage via {@link #toCompletableFuture()}.
+     * For example, completion of a minimal stage can be awaited by
+     *
+     * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre>
+     *
      * @return the new CompletionStage
      * @since 9
      */
@@ -2853,6 +2860,16 @@
         @Override public CompletableFuture<T> completeOnTimeout
             (T value, long timeout, TimeUnit unit) {
             throw new UnsupportedOperationException(); }
+        @Override public CompletableFuture<T> toCompletableFuture() {
+            Object r;
+            if ((r = result) != null)
+                return new CompletableFuture<T>(encodeRelay(r));
+            else {
+                CompletableFuture<T> d = new CompletableFuture<>();
+                unipush(new UniRelay<T,T>(d, this));
+                return d;
+            }
+        }
     }
 
     // VarHandle mechanics
--- a/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java	Fri Sep 23 12:08:38 2016 +0300
+++ b/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java	Fri Sep 23 13:14:14 2016 -0700
@@ -388,7 +388,7 @@
         checkCompletedNormally(f, "test");
     }
 
-    abstract class CheckedAction {
+    abstract static class CheckedAction {
         int invocationCount = 0;
         final ExecutionMode m;
         CheckedAction(ExecutionMode m) { this.m = m; }
@@ -400,7 +400,7 @@
         void assertInvoked() { assertEquals(1, invocationCount); }
     }
 
-    abstract class CheckedIntegerAction extends CheckedAction {
+    abstract static class CheckedIntegerAction extends CheckedAction {
         Integer value;
         CheckedIntegerAction(ExecutionMode m) { super(m); }
         void assertValue(Integer expected) {
@@ -409,7 +409,7 @@
         }
     }
 
-    class IntegerSupplier extends CheckedAction
+    static class IntegerSupplier extends CheckedAction
         implements Supplier<Integer>
     {
         final Integer value;
@@ -428,7 +428,7 @@
         return (x == null) ? null : x + 1;
     }
 
-    class NoopConsumer extends CheckedIntegerAction
+    static class NoopConsumer extends CheckedIntegerAction
         implements Consumer<Integer>
     {
         NoopConsumer(ExecutionMode m) { super(m); }
@@ -438,7 +438,7 @@
         }
     }
 
-    class IncFunction extends CheckedIntegerAction
+    static class IncFunction extends CheckedIntegerAction
         implements Function<Integer,Integer>
     {
         IncFunction(ExecutionMode m) { super(m); }
@@ -456,7 +456,7 @@
             - ((y == null) ? 99 : y.intValue());
     }
 
-    class SubtractAction extends CheckedIntegerAction
+    static class SubtractAction extends CheckedIntegerAction
         implements BiConsumer<Integer, Integer>
     {
         SubtractAction(ExecutionMode m) { super(m); }
@@ -466,7 +466,7 @@
         }
     }
 
-    class SubtractFunction extends CheckedIntegerAction
+    static class SubtractFunction extends CheckedIntegerAction
         implements BiFunction<Integer, Integer, Integer>
     {
         SubtractFunction(ExecutionMode m) { super(m); }
@@ -476,14 +476,14 @@
         }
     }
 
-    class Noop extends CheckedAction implements Runnable {
+    static class Noop extends CheckedAction implements Runnable {
         Noop(ExecutionMode m) { super(m); }
         public void run() {
             invoked();
         }
     }
 
-    class FailingSupplier extends CheckedAction
+    static class FailingSupplier extends CheckedAction
         implements Supplier<Integer>
     {
         final CFException ex;
@@ -494,7 +494,7 @@
         }
     }
 
-    class FailingConsumer extends CheckedIntegerAction
+    static class FailingConsumer extends CheckedIntegerAction
         implements Consumer<Integer>
     {
         final CFException ex;
@@ -506,7 +506,7 @@
         }
     }
 
-    class FailingBiConsumer extends CheckedIntegerAction
+    static class FailingBiConsumer extends CheckedIntegerAction
         implements BiConsumer<Integer, Integer>
     {
         final CFException ex;
@@ -518,7 +518,7 @@
         }
     }
 
-    class FailingFunction extends CheckedIntegerAction
+    static class FailingFunction extends CheckedIntegerAction
         implements Function<Integer, Integer>
     {
         final CFException ex;
@@ -530,7 +530,7 @@
         }
     }
 
-    class FailingBiFunction extends CheckedIntegerAction
+    static class FailingBiFunction extends CheckedIntegerAction
         implements BiFunction<Integer, Integer, Integer>
     {
         final CFException ex;
@@ -542,7 +542,7 @@
         }
     }
 
-    class FailingRunnable extends CheckedAction implements Runnable {
+    static class FailingRunnable extends CheckedAction implements Runnable {
         final CFException ex;
         FailingRunnable(ExecutionMode m) { super(m); ex = new CFException(); }
         public void run() {
@@ -551,7 +551,7 @@
         }
     }
 
-    class CompletableFutureInc extends CheckedIntegerAction
+    static class CompletableFutureInc extends CheckedIntegerAction
         implements Function<Integer, CompletableFuture<Integer>>
     {
         CompletableFutureInc(ExecutionMode m) { super(m); }
@@ -564,7 +564,7 @@
         }
     }
 
-    class FailingCompletableFutureFunction extends CheckedIntegerAction
+    static class FailingCompletableFutureFunction extends CheckedIntegerAction
         implements Function<Integer, CompletableFuture<Integer>>
     {
         final CFException ex;
@@ -3604,29 +3604,53 @@
      * copy returns a CompletableFuture that is completed normally,
      * with the same value, when source is.
      */
-    public void testCopy() {
+    public void testCopy_normalCompletion() {
+        for (boolean createIncomplete : new boolean[] { true, false })
+        for (Integer v1 : new Integer[] { 1, null })
+    {
         CompletableFuture<Integer> f = new CompletableFuture<>();
+        if (!createIncomplete) assertTrue(f.complete(v1));
         CompletableFuture<Integer> g = f.copy();
-        checkIncomplete(f);
-        checkIncomplete(g);
-        f.complete(1);
-        checkCompletedNormally(f, 1);
-        checkCompletedNormally(g, 1);
-    }
+        if (createIncomplete) {
+            checkIncomplete(f);
+            checkIncomplete(g);
+            assertTrue(f.complete(v1));
+        }
+        checkCompletedNormally(f, v1);
+        checkCompletedNormally(g, v1);
+    }}
 
     /**
      * copy returns a CompletableFuture that is completed exceptionally
      * when source is.
      */
-    public void testCopy2() {
+    public void testCopy_exceptionalCompletion() {
+        for (boolean createIncomplete : new boolean[] { true, false })
+    {
+        CFException ex = new CFException();
         CompletableFuture<Integer> f = new CompletableFuture<>();
+        if (!createIncomplete) f.completeExceptionally(ex);
         CompletableFuture<Integer> g = f.copy();
-        checkIncomplete(f);
-        checkIncomplete(g);
-        CFException ex = new CFException();
-        f.completeExceptionally(ex);
+        if (createIncomplete) {
+            checkIncomplete(f);
+            checkIncomplete(g);
+            f.completeExceptionally(ex);
+        }
         checkCompletedExceptionally(f, ex);
         checkCompletedWithWrappedException(g, ex);
+    }}
+
+    /**
+     * Completion of a copy does not complete its source.
+     */
+    public void testCopy_oneWayPropagation() {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        assertTrue(f.copy().complete(1));
+        assertTrue(f.copy().complete(null));
+        assertTrue(f.copy().cancel(true));
+        assertTrue(f.copy().cancel(false));
+        assertTrue(f.copy().completeExceptionally(new CFException()));
+        checkIncomplete(f);
     }
 
     /**
@@ -3991,7 +4015,10 @@
             .collect(Collectors.toList());
 
         List<CompletionStage<Integer>> stages = new ArrayList<>();
-        stages.add(new CompletableFuture<Integer>().minimalCompletionStage());
+        CompletionStage<Integer> min =
+            new CompletableFuture<Integer>().minimalCompletionStage();
+        stages.add(min);
+        stages.add(min.thenApply(x -> x));
         stages.add(CompletableFuture.completedStage(1));
         stages.add(CompletableFuture.failedStage(new CFException()));
 
@@ -4027,6 +4054,131 @@
             throw new Error("Methods did not throw UOE: " + bugs);
     }
 
+    /**
+     * minimalStage.toCompletableFuture() returns a CompletableFuture that
+     * is completed normally, with the same value, when source is.
+     */
+    public void testMinimalCompletionStage_toCompletableFuture_normalCompletion() {
+        for (boolean createIncomplete : new boolean[] { true, false })
+        for (Integer v1 : new Integer[] { 1, null })
+    {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        CompletionStage<Integer> minimal = f.minimalCompletionStage();
+        if (!createIncomplete) assertTrue(f.complete(v1));
+        CompletableFuture<Integer> g = minimal.toCompletableFuture();
+        if (createIncomplete) {
+            checkIncomplete(f);
+            checkIncomplete(g);
+            assertTrue(f.complete(v1));
+        }
+        checkCompletedNormally(f, v1);
+        checkCompletedNormally(g, v1);
+    }}
+
+    /**
+     * minimalStage.toCompletableFuture() returns a CompletableFuture that
+     * is completed exceptionally when source is.
+     */
+    public void testMinimalCompletionStage_toCompletableFuture_exceptionalCompletion() {
+        for (boolean createIncomplete : new boolean[] { true, false })
+    {
+        CFException ex = new CFException();
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        CompletionStage<Integer> minimal = f.minimalCompletionStage();
+        if (!createIncomplete) f.completeExceptionally(ex);
+        CompletableFuture<Integer> g = minimal.toCompletableFuture();
+        if (createIncomplete) {
+            checkIncomplete(f);
+            checkIncomplete(g);
+            f.completeExceptionally(ex);
+        }
+        checkCompletedExceptionally(f, ex);
+        checkCompletedWithWrappedException(g, ex);
+    }}
+
+    /**
+     * minimalStage.toCompletableFuture() gives mutable CompletableFuture
+     */
+    public void testMinimalCompletionStage_toCompletableFuture_mutable() {
+        for (Integer v1 : new Integer[] { 1, null })
+    {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        CompletionStage minimal = f.minimalCompletionStage();
+        CompletableFuture<Integer> g = minimal.toCompletableFuture();
+        assertTrue(g.complete(v1));
+        checkCompletedNormally(g, v1);
+        checkIncomplete(f);
+        checkIncomplete(minimal.toCompletableFuture());
+    }}
+
+    /**
+     * minimalStage.toCompletableFuture().join() awaits completion
+     */
+    public void testMinimalCompletionStage_toCompletableFuture_join() throws Exception {
+        for (boolean createIncomplete : new boolean[] { true, false })
+        for (Integer v1 : new Integer[] { 1, null })
+    {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        if (!createIncomplete) assertTrue(f.complete(v1));
+        CompletionStage<Integer> minimal = f.minimalCompletionStage();
+        if (createIncomplete) assertTrue(f.complete(v1));
+        assertEquals(v1, minimal.toCompletableFuture().join());
+        assertEquals(v1, minimal.toCompletableFuture().get());
+        checkCompletedNormally(minimal.toCompletableFuture(), v1);
+    }}
+
+    /**
+     * Completion of a toCompletableFuture copy of a minimal stage
+     * does not complete its source.
+     */
+    public void testMinimalCompletionStage_toCompletableFuture_oneWayPropagation() {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        CompletionStage<Integer> g = f.minimalCompletionStage();
+        assertTrue(g.toCompletableFuture().complete(1));
+        assertTrue(g.toCompletableFuture().complete(null));
+        assertTrue(g.toCompletableFuture().cancel(true));
+        assertTrue(g.toCompletableFuture().cancel(false));
+        assertTrue(g.toCompletableFuture().completeExceptionally(new CFException()));
+        checkIncomplete(g.toCompletableFuture());
+        f.complete(1);
+        checkCompletedNormally(g.toCompletableFuture(), 1);
+    }
+
+    /** Demo utility method for external reliable toCompletableFuture */
+    static <T> CompletableFuture<T> toCompletableFuture(CompletionStage<T> stage) {
+        CompletableFuture<T> f = new CompletableFuture<>();
+        stage.handle((T t, Throwable ex) -> {
+                         if (ex != null) f.completeExceptionally(ex);
+                         else f.complete(t);
+                         return null;
+                     });
+        return f;
+    }
+
+    /** Demo utility method to join a CompletionStage */
+    static <T> T join(CompletionStage<T> stage) {
+        return toCompletableFuture(stage).join();
+    }
+
+    /**
+     * Joining a minimal stage "by hand" works
+     */
+    public void testMinimalCompletionStage_join_by_hand() {
+        for (boolean createIncomplete : new boolean[] { true, false })
+        for (Integer v1 : new Integer[] { 1, null })
+    {
+        CompletableFuture<Integer> f = new CompletableFuture<>();
+        CompletionStage<Integer> minimal = f.minimalCompletionStage();
+        CompletableFuture<Integer> g = new CompletableFuture<>();
+        if (!createIncomplete) assertTrue(f.complete(v1));
+        minimal.thenAccept((x) -> g.complete(x));
+        if (createIncomplete) assertTrue(f.complete(v1));
+        g.join();
+        checkCompletedNormally(g, v1);
+        checkCompletedNormally(f, v1);
+        assertEquals(v1, join(minimal));
+    }}
+
     static class Monad {
         static class ZeroException extends RuntimeException {
             public ZeroException() { super("monadic zero"); }
@@ -4317,6 +4469,22 @@
             assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
     }
 
+    /**
+     * Checks for garbage retention when MinimalStage.toCompletableFuture()
+     * is invoked many times.
+     * 8161600: Garbage retention when source CompletableFutures are never completed
+     *
+     * As of 2016-07, fails with OOME:
+     * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testToCompletableFutureGarbageRetention tck
+     */
+    public void testToCompletableFutureGarbageRetention() throws Throwable {
+        final int n = expensiveTests ? 900_000 : 10;
+        CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
+        CompletionStage minimal = neverCompleted.minimalCompletionStage();
+        for (int i = 0; i < n; i++)
+            assertTrue(minimal.toCompletableFuture().cancel(true));
+    }
+
 //     static <U> U join(CompletionStage<U> stage) {
 //         CompletableFuture<U> f = new CompletableFuture<>();
 //         stage.whenComplete((v, ex) -> {