--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Jul 26 09:49:25 2016 -0700
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Jul 26 09:53:38 2016 -0700
@@ -221,7 +221,10 @@
* across both while pushing actions. The second completion is
* a CoCompletion pointing to the first, shared so that at most
* one performs the action. The multiple-arity methods allOf
- * and anyOf do this pairwise to form trees of completions.
+ * does this pairwise to form trees of completions. Method
+ * anyOf is handled differently from allOf because completion of
+ * any source should trigger a cleanStack of other sources.
+ * Each AnyOf completion can reach others via a shared array.
*
* Note that the generic type parameters of methods vary according
* to whether "this" is a source, dependent, or completion.
@@ -588,9 +591,9 @@
}
/**
- * Post-processing by dependent after successful UniCompletion
- * tryFire. Tries to clean stack of source a, and then either runs
- * postComplete or returns this to caller, depending on mode.
+ * Post-processing by dependent after successful UniCompletion tryFire.
+ * Tries to clean stack of source a, and then either runs postComplete
+ * or returns this to caller, depending on mode.
*/
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
@@ -1003,12 +1006,12 @@
}
@SuppressWarnings("serial")
- static final class UniRelay<T> extends UniCompletion<T,T> {
- UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
+ static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
+ UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) {
super(null, dep, src);
}
- final CompletableFuture<T> tryFire(int mode) {
- CompletableFuture<T> d; CompletableFuture<T> a; Object r;
+ final CompletableFuture<U> tryFire(int mode) {
+ CompletableFuture<U> d; CompletableFuture<T> a; Object r;
if ((d = dep) == null
|| (a = src) == null || (r = a.result) == null)
return null;
@@ -1019,13 +1022,14 @@
}
}
- private CompletableFuture<T> uniCopyStage() {
+ private static <U, T extends U> CompletableFuture<U> uniCopyStage(
+ CompletableFuture<T> src) {
Object r;
- CompletableFuture<T> d = newIncompleteFuture();
- if ((r = result) != null)
+ CompletableFuture<U> d = src.newIncompleteFuture();
+ if ((r = src.result) != null)
d.result = encodeRelay(r);
else
- unipush(new UniRelay<T>(d, this));
+ src.unipush(new UniRelay<U,T>(d, src));
return d;
}
@@ -1034,7 +1038,7 @@
if ((r = result) != null)
return new MinimalStage<T>(encodeRelay(r));
MinimalStage<T> d = new MinimalStage<T>();
- unipush(new UniRelay<T>(d, this));
+ unipush(new UniRelay<T,T>(d, this));
return d;
}
@@ -1069,7 +1073,7 @@
if ((r = g.result) != null)
d.completeRelay(r);
else {
- g.unipush(new UniRelay<V>(d, g));
+ g.unipush(new UniRelay<V,V>(d, g));
if (d.result == null)
return null;
}
@@ -1103,7 +1107,7 @@
if ((s = g.result) != null)
d.result = encodeRelay(s);
else {
- g.unipush(new UniRelay<V>(d, g));
+ g.unipush(new UniRelay<V,V>(d, g));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
@@ -1637,45 +1641,40 @@
return d;
}
+ /** Completion for an anyOf input future. */
@SuppressWarnings("serial")
- static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
- OrRelay(CompletableFuture<Object> dep,
- CompletableFuture<T> src, CompletableFuture<U> snd) {
- super(null, dep, src, snd);
+ static class AnyOf extends Completion {
+ CompletableFuture<Object> dep; CompletableFuture<?> src;
+ CompletableFuture<?>[] srcs;
+ AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,
+ CompletableFuture<?>[] srcs) {
+ this.dep = dep; this.src = src; this.srcs = srcs;
}
final CompletableFuture<Object> tryFire(int mode) {
- CompletableFuture<Object> d;
- CompletableFuture<T> a;
- CompletableFuture<U> b;
+ // assert mode != ASYNC;
+ CompletableFuture<Object> d; CompletableFuture<?> a;
+ CompletableFuture<?>[] as;
Object r;
if ((d = dep) == null
- || (a = src) == null || (b = snd) == null
- || ((r = a.result) == null && (r = b.result) == null))
+ || (a = src) == null || (r = a.result) == null
+ || (as = srcs) == null)
return null;
- d.completeRelay(r);
- src = null; snd = null; dep = null;
- return d.postFire(a, b, mode);
+ dep = null; src = null; srcs = null;
+ if (d.completeRelay(r)) {
+ for (CompletableFuture<?> b : as)
+ if (b != a)
+ b.cleanStack();
+ if (mode < 0)
+ return d;
+ else
+ d.postComplete();
+ }
+ return null;
}
- }
-
- /** Recursively constructs a tree of completions. */
- static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
- int lo, int hi) {
- CompletableFuture<Object> d = new CompletableFuture<Object>();
- if (lo <= hi) {
- CompletableFuture<?> a, b; Object r;
- int mid = (lo + hi) >>> 1;
- if ((a = (lo == mid ? cfs[lo] :
- orTree(cfs, lo, mid))) == null ||
- (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
- orTree(cfs, mid+1, hi))) == null)
- throw new NullPointerException();
- if ((r = a.result) != null && (r = b.result) != null)
- d.result = encodeRelay(r);
- else
- a.orpush(b, new OrRelay<>(d, a, b));
+ final boolean isLive() {
+ CompletableFuture<Object> d;
+ return (d = dep) != null && d.result == null;
}
- return d;
}
/* ------------- Zero-input Async forms -------------- */
@@ -2354,7 +2353,28 @@
* {@code null}
*/
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
- return orTree(cfs, 0, cfs.length - 1);
+ int n; Object r;
+ if ((n = cfs.length) <= 1)
+ return (n == 0)
+ ? new CompletableFuture<Object>()
+ : uniCopyStage(cfs[0]);
+ for (CompletableFuture<?> cf : cfs)
+ if ((r = cf.result) != null)
+ return new CompletableFuture<Object>(encodeRelay(r));
+ cfs = cfs.clone();
+ CompletableFuture<Object> d = new CompletableFuture<>();
+ for (CompletableFuture<?> cf : cfs)
+ cf.unipush(new AnyOf(d, cf, cfs));
+ // If d was completed while we were adding completions, we should
+ // clean the stack of any sources that may have had completions
+ // pushed on their stack after d was completed.
+ if (d.result != null)
+ for (int i = 0, len = cfs.length; i < len; i++)
+ if (cfs[i].result != null)
+ for (i++; i < len; i++)
+ if (cfs[i].result == null)
+ cfs[i].cleanStack();
+ return d;
}
/* ------------- Control and status methods -------------- */
@@ -2526,7 +2546,7 @@
* @since 9
*/
public CompletableFuture<T> copy() {
- return uniCopyStage();
+ return uniCopyStage(this);
}
/**
--- a/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Tue Jul 26 09:49:25 2016 -0700
+++ b/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Tue Jul 26 09:53:38 2016 -0700
@@ -4269,12 +4269,11 @@
}
}
- /*
- * Tests below currently fail in stress mode due to memory retention.
- * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck
+ /**
+ * Reproduction recipe for:
+ * 8160402: Garbage retention with CompletableFuture.anyOf
+ * cvs update -D '2016-05-01' ./src/main/java/util/concurrent/CompletableFuture.java && ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testAnyOfGarbageRetention tck; cvs update -A
*/
-
- /** Checks for garbage retention with anyOf. */
public void testAnyOfGarbageRetention() throws Throwable {
for (Integer v : new Integer[] { 1, null })
{
@@ -4288,7 +4287,12 @@
checkCompletedNormally(CompletableFuture.anyOf(fs), v);
}}
- /** Checks for garbage retention with allOf. */
+ /**
+ * Checks for garbage retention with allOf.
+ *
+ * As of 2016-07, fails with OOME:
+ * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledAllOfGarbageRetention tck
+ */
public void testCancelledAllOfGarbageRetention() throws Throwable {
final int n = expensiveTests ? 100_000 : 10;
CompletableFuture<Integer>[] fs
@@ -4299,6 +4303,21 @@
assertTrue(CompletableFuture.allOf(fs).cancel(false));
}
+ /**
+ * Checks for garbage retention when a dependent future is
+ * cancelled and garbage-collected.
+ * 8161600: Garbage retention when source CompletableFutures are never completed
+ *
+ * As of 2016-07, fails with OOME:
+ * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledGarbageRetention tck
+ */
+ public void testCancelledGarbageRetention() throws Throwable {
+ final int n = expensiveTests ? 100_000 : 10;
+ CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
+ for (int i = 0; i < n; i++)
+ assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
+ }
+
// static <U> U join(CompletionStage<U> stage) {
// CompletableFuture<U> f = new CompletableFuture<>();
// stage.whenComplete((v, ex) -> {