8160402: Garbage retention with CompletableFuture.anyOf
authordl
Tue, 26 Jul 2016 09:53:38 -0700
changeset 39778 5cda06c52cdd
parent 39777 a9b80432c521
child 39779 4666307d3155
8160402: Garbage retention with CompletableFuture.anyOf Reviewed-by: martin, psandoz, plevart
jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java
jdk/test/java/util/concurrent/tck/CompletableFutureTest.java
--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java	Tue Jul 26 09:49:25 2016 -0700
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java	Tue Jul 26 09:53:38 2016 -0700
@@ -221,7 +221,10 @@
      *   across both while pushing actions.  The second completion is
      *   a CoCompletion pointing to the first, shared so that at most
      *   one performs the action.  The multiple-arity methods allOf
-     *   and anyOf do this pairwise to form trees of completions.
+     *   does this pairwise to form trees of completions.  Method
+     *   anyOf is handled differently from allOf because completion of
+     *   any source should trigger a cleanStack of other sources.
+     *   Each AnyOf completion can reach others via a shared array.
      *
      * Note that the generic type parameters of methods vary according
      * to whether "this" is a source, dependent, or completion.
@@ -588,9 +591,9 @@
     }
 
     /**
-     * Post-processing by dependent after successful UniCompletion
-     * tryFire.  Tries to clean stack of source a, and then either runs
-     * postComplete or returns this to caller, depending on mode.
+     * Post-processing by dependent after successful UniCompletion tryFire.
+     * Tries to clean stack of source a, and then either runs postComplete
+     * or returns this to caller, depending on mode.
      */
     final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
         if (a != null && a.stack != null) {
@@ -1003,12 +1006,12 @@
     }
 
     @SuppressWarnings("serial")
-    static final class UniRelay<T> extends UniCompletion<T,T> {
-        UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
+    static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
+        UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) {
             super(null, dep, src);
         }
-        final CompletableFuture<T> tryFire(int mode) {
-            CompletableFuture<T> d; CompletableFuture<T> a; Object r;
+        final CompletableFuture<U> tryFire(int mode) {
+            CompletableFuture<U> d; CompletableFuture<T> a; Object r;
             if ((d = dep) == null
                 || (a = src) == null || (r = a.result) == null)
                 return null;
@@ -1019,13 +1022,14 @@
         }
     }
 
-    private CompletableFuture<T> uniCopyStage() {
+    private static <U, T extends U> CompletableFuture<U> uniCopyStage(
+        CompletableFuture<T> src) {
         Object r;
-        CompletableFuture<T> d = newIncompleteFuture();
-        if ((r = result) != null)
+        CompletableFuture<U> d = src.newIncompleteFuture();
+        if ((r = src.result) != null)
             d.result = encodeRelay(r);
         else
-            unipush(new UniRelay<T>(d, this));
+            src.unipush(new UniRelay<U,T>(d, src));
         return d;
     }
 
@@ -1034,7 +1038,7 @@
         if ((r = result) != null)
             return new MinimalStage<T>(encodeRelay(r));
         MinimalStage<T> d = new MinimalStage<T>();
-        unipush(new UniRelay<T>(d, this));
+        unipush(new UniRelay<T,T>(d, this));
         return d;
     }
 
@@ -1069,7 +1073,7 @@
                     if ((r = g.result) != null)
                         d.completeRelay(r);
                     else {
-                        g.unipush(new UniRelay<V>(d, g));
+                        g.unipush(new UniRelay<V,V>(d, g));
                         if (d.result == null)
                             return null;
                     }
@@ -1103,7 +1107,7 @@
                 if ((s = g.result) != null)
                     d.result = encodeRelay(s);
                 else {
-                    g.unipush(new UniRelay<V>(d, g));
+                    g.unipush(new UniRelay<V,V>(d, g));
                 }
             } catch (Throwable ex) {
                 d.result = encodeThrowable(ex);
@@ -1637,45 +1641,40 @@
         return d;
     }
 
+    /** Completion for an anyOf input future. */
     @SuppressWarnings("serial")
-    static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
-        OrRelay(CompletableFuture<Object> dep,
-                CompletableFuture<T> src, CompletableFuture<U> snd) {
-            super(null, dep, src, snd);
+    static class AnyOf extends Completion {
+        CompletableFuture<Object> dep; CompletableFuture<?> src;
+        CompletableFuture<?>[] srcs;
+        AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,
+              CompletableFuture<?>[] srcs) {
+            this.dep = dep; this.src = src; this.srcs = srcs;
         }
         final CompletableFuture<Object> tryFire(int mode) {
-            CompletableFuture<Object> d;
-            CompletableFuture<T> a;
-            CompletableFuture<U> b;
+            // assert mode != ASYNC;
+            CompletableFuture<Object> d; CompletableFuture<?> a;
+            CompletableFuture<?>[] as;
             Object r;
             if ((d = dep) == null
-                || (a = src) == null || (b = snd) == null
-                || ((r = a.result) == null && (r = b.result) == null))
+                || (a = src) == null || (r = a.result) == null
+                || (as = srcs) == null)
                 return null;
-            d.completeRelay(r);
-            src = null; snd = null; dep = null;
-            return d.postFire(a, b, mode);
+            dep = null; src = null; srcs = null;
+            if (d.completeRelay(r)) {
+                for (CompletableFuture<?> b : as)
+                    if (b != a)
+                        b.cleanStack();
+                if (mode < 0)
+                    return d;
+                else
+                    d.postComplete();
+            }
+            return null;
         }
-    }
-
-    /** Recursively constructs a tree of completions. */
-    static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
-                                            int lo, int hi) {
-        CompletableFuture<Object> d = new CompletableFuture<Object>();
-        if (lo <= hi) {
-            CompletableFuture<?> a, b; Object r;
-            int mid = (lo + hi) >>> 1;
-            if ((a = (lo == mid ? cfs[lo] :
-                      orTree(cfs, lo, mid))) == null ||
-                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
-                      orTree(cfs, mid+1, hi))) == null)
-                throw new NullPointerException();
-            if ((r = a.result) != null && (r = b.result) != null)
-                d.result = encodeRelay(r);
-            else
-                a.orpush(b, new OrRelay<>(d, a, b));
+        final boolean isLive() {
+            CompletableFuture<Object> d;
+            return (d = dep) != null && d.result == null;
         }
-        return d;
     }
 
     /* ------------- Zero-input Async forms -------------- */
@@ -2354,7 +2353,28 @@
      * {@code null}
      */
     public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
-        return orTree(cfs, 0, cfs.length - 1);
+        int n; Object r;
+        if ((n = cfs.length) <= 1)
+            return (n == 0)
+                ? new CompletableFuture<Object>()
+                : uniCopyStage(cfs[0]);
+        for (CompletableFuture<?> cf : cfs)
+            if ((r = cf.result) != null)
+                return new CompletableFuture<Object>(encodeRelay(r));
+        cfs = cfs.clone();
+        CompletableFuture<Object> d = new CompletableFuture<>();
+        for (CompletableFuture<?> cf : cfs)
+            cf.unipush(new AnyOf(d, cf, cfs));
+        // If d was completed while we were adding completions, we should
+        // clean the stack of any sources that may have had completions
+        // pushed on their stack after d was completed.
+        if (d.result != null)
+            for (int i = 0, len = cfs.length; i < len; i++)
+                if (cfs[i].result != null)
+                    for (i++; i < len; i++)
+                        if (cfs[i].result == null)
+                            cfs[i].cleanStack();
+        return d;
     }
 
     /* ------------- Control and status methods -------------- */
@@ -2526,7 +2546,7 @@
      * @since 9
      */
     public CompletableFuture<T> copy() {
-        return uniCopyStage();
+        return uniCopyStage(this);
     }
 
     /**
--- a/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java	Tue Jul 26 09:49:25 2016 -0700
+++ b/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java	Tue Jul 26 09:53:38 2016 -0700
@@ -4269,12 +4269,11 @@
         }
     }
 
-    /*
-     * Tests below currently fail in stress mode due to memory retention.
-     * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck
+    /**
+     * Reproduction recipe for:
+     * 8160402: Garbage retention with CompletableFuture.anyOf
+     * cvs update -D '2016-05-01' ./src/main/java/util/concurrent/CompletableFuture.java && ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testAnyOfGarbageRetention tck; cvs update -A
      */
-
-    /** Checks for garbage retention with anyOf. */
     public void testAnyOfGarbageRetention() throws Throwable {
         for (Integer v : new Integer[] { 1, null })
     {
@@ -4288,7 +4287,12 @@
             checkCompletedNormally(CompletableFuture.anyOf(fs), v);
     }}
 
-    /** Checks for garbage retention with allOf. */
+    /**
+     * Checks for garbage retention with allOf.
+     *
+     * As of 2016-07, fails with OOME:
+     * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledAllOfGarbageRetention tck
+     */
     public void testCancelledAllOfGarbageRetention() throws Throwable {
         final int n = expensiveTests ? 100_000 : 10;
         CompletableFuture<Integer>[] fs
@@ -4299,6 +4303,21 @@
             assertTrue(CompletableFuture.allOf(fs).cancel(false));
     }
 
+    /**
+     * Checks for garbage retention when a dependent future is
+     * cancelled and garbage-collected.
+     * 8161600: Garbage retention when source CompletableFutures are never completed
+     *
+     * As of 2016-07, fails with OOME:
+     * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledGarbageRetention tck
+     */
+    public void testCancelledGarbageRetention() throws Throwable {
+        final int n = expensiveTests ? 100_000 : 10;
+        CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
+        for (int i = 0; i < n; i++)
+            assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
+    }
+
 //     static <U> U join(CompletionStage<U> stage) {
 //         CompletableFuture<U> f = new CompletableFuture<>();
 //         stage.whenComplete((v, ex) -> {