--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Fri Sep 23 12:08:38 2016 +0300
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Fri Sep 23 13:14:14 2016 -0700
@@ -2559,6 +2559,13 @@
* exceptionally with a CompletionException with this exception as
* cause.
*
+ * <p>Unless overridden by a subclass, a new non-minimal
+ * CompletableFuture with all methods available can be obtained from
+ * a minimal CompletionStage via {@link #toCompletableFuture()}.
+ * For example, completion of a minimal stage can be awaited by
+ *
+ * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre>
+ *
* @return the new CompletionStage
* @since 9
*/
@@ -2853,6 +2860,16 @@
@Override public CompletableFuture<T> completeOnTimeout
(T value, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<T> toCompletableFuture() {
+ Object r;
+ if ((r = result) != null)
+ return new CompletableFuture<T>(encodeRelay(r));
+ else {
+ CompletableFuture<T> d = new CompletableFuture<>();
+ unipush(new UniRelay<T,T>(d, this));
+ return d;
+ }
+ }
}
// VarHandle mechanics
--- a/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Fri Sep 23 12:08:38 2016 +0300
+++ b/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Fri Sep 23 13:14:14 2016 -0700
@@ -388,7 +388,7 @@
checkCompletedNormally(f, "test");
}
- abstract class CheckedAction {
+ abstract static class CheckedAction {
int invocationCount = 0;
final ExecutionMode m;
CheckedAction(ExecutionMode m) { this.m = m; }
@@ -400,7 +400,7 @@
void assertInvoked() { assertEquals(1, invocationCount); }
}
- abstract class CheckedIntegerAction extends CheckedAction {
+ abstract static class CheckedIntegerAction extends CheckedAction {
Integer value;
CheckedIntegerAction(ExecutionMode m) { super(m); }
void assertValue(Integer expected) {
@@ -409,7 +409,7 @@
}
}
- class IntegerSupplier extends CheckedAction
+ static class IntegerSupplier extends CheckedAction
implements Supplier<Integer>
{
final Integer value;
@@ -428,7 +428,7 @@
return (x == null) ? null : x + 1;
}
- class NoopConsumer extends CheckedIntegerAction
+ static class NoopConsumer extends CheckedIntegerAction
implements Consumer<Integer>
{
NoopConsumer(ExecutionMode m) { super(m); }
@@ -438,7 +438,7 @@
}
}
- class IncFunction extends CheckedIntegerAction
+ static class IncFunction extends CheckedIntegerAction
implements Function<Integer,Integer>
{
IncFunction(ExecutionMode m) { super(m); }
@@ -456,7 +456,7 @@
- ((y == null) ? 99 : y.intValue());
}
- class SubtractAction extends CheckedIntegerAction
+ static class SubtractAction extends CheckedIntegerAction
implements BiConsumer<Integer, Integer>
{
SubtractAction(ExecutionMode m) { super(m); }
@@ -466,7 +466,7 @@
}
}
- class SubtractFunction extends CheckedIntegerAction
+ static class SubtractFunction extends CheckedIntegerAction
implements BiFunction<Integer, Integer, Integer>
{
SubtractFunction(ExecutionMode m) { super(m); }
@@ -476,14 +476,14 @@
}
}
- class Noop extends CheckedAction implements Runnable {
+ static class Noop extends CheckedAction implements Runnable {
Noop(ExecutionMode m) { super(m); }
public void run() {
invoked();
}
}
- class FailingSupplier extends CheckedAction
+ static class FailingSupplier extends CheckedAction
implements Supplier<Integer>
{
final CFException ex;
@@ -494,7 +494,7 @@
}
}
- class FailingConsumer extends CheckedIntegerAction
+ static class FailingConsumer extends CheckedIntegerAction
implements Consumer<Integer>
{
final CFException ex;
@@ -506,7 +506,7 @@
}
}
- class FailingBiConsumer extends CheckedIntegerAction
+ static class FailingBiConsumer extends CheckedIntegerAction
implements BiConsumer<Integer, Integer>
{
final CFException ex;
@@ -518,7 +518,7 @@
}
}
- class FailingFunction extends CheckedIntegerAction
+ static class FailingFunction extends CheckedIntegerAction
implements Function<Integer, Integer>
{
final CFException ex;
@@ -530,7 +530,7 @@
}
}
- class FailingBiFunction extends CheckedIntegerAction
+ static class FailingBiFunction extends CheckedIntegerAction
implements BiFunction<Integer, Integer, Integer>
{
final CFException ex;
@@ -542,7 +542,7 @@
}
}
- class FailingRunnable extends CheckedAction implements Runnable {
+ static class FailingRunnable extends CheckedAction implements Runnable {
final CFException ex;
FailingRunnable(ExecutionMode m) { super(m); ex = new CFException(); }
public void run() {
@@ -551,7 +551,7 @@
}
}
- class CompletableFutureInc extends CheckedIntegerAction
+ static class CompletableFutureInc extends CheckedIntegerAction
implements Function<Integer, CompletableFuture<Integer>>
{
CompletableFutureInc(ExecutionMode m) { super(m); }
@@ -564,7 +564,7 @@
}
}
- class FailingCompletableFutureFunction extends CheckedIntegerAction
+ static class FailingCompletableFutureFunction extends CheckedIntegerAction
implements Function<Integer, CompletableFuture<Integer>>
{
final CFException ex;
@@ -3604,29 +3604,53 @@
* copy returns a CompletableFuture that is completed normally,
* with the same value, when source is.
*/
- public void testCopy() {
+ public void testCopy_normalCompletion() {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ for (Integer v1 : new Integer[] { 1, null })
+ {
CompletableFuture<Integer> f = new CompletableFuture<>();
+ if (!createIncomplete) assertTrue(f.complete(v1));
CompletableFuture<Integer> g = f.copy();
- checkIncomplete(f);
- checkIncomplete(g);
- f.complete(1);
- checkCompletedNormally(f, 1);
- checkCompletedNormally(g, 1);
- }
+ if (createIncomplete) {
+ checkIncomplete(f);
+ checkIncomplete(g);
+ assertTrue(f.complete(v1));
+ }
+ checkCompletedNormally(f, v1);
+ checkCompletedNormally(g, v1);
+ }}
/**
* copy returns a CompletableFuture that is completed exceptionally
* when source is.
*/
- public void testCopy2() {
+ public void testCopy_exceptionalCompletion() {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ {
+ CFException ex = new CFException();
CompletableFuture<Integer> f = new CompletableFuture<>();
+ if (!createIncomplete) f.completeExceptionally(ex);
CompletableFuture<Integer> g = f.copy();
- checkIncomplete(f);
- checkIncomplete(g);
- CFException ex = new CFException();
- f.completeExceptionally(ex);
+ if (createIncomplete) {
+ checkIncomplete(f);
+ checkIncomplete(g);
+ f.completeExceptionally(ex);
+ }
checkCompletedExceptionally(f, ex);
checkCompletedWithWrappedException(g, ex);
+ }}
+
+ /**
+ * Completion of a copy does not complete its source.
+ */
+ public void testCopy_oneWayPropagation() {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ assertTrue(f.copy().complete(1));
+ assertTrue(f.copy().complete(null));
+ assertTrue(f.copy().cancel(true));
+ assertTrue(f.copy().cancel(false));
+ assertTrue(f.copy().completeExceptionally(new CFException()));
+ checkIncomplete(f);
}
/**
@@ -3991,7 +4015,10 @@
.collect(Collectors.toList());
List<CompletionStage<Integer>> stages = new ArrayList<>();
- stages.add(new CompletableFuture<Integer>().minimalCompletionStage());
+ CompletionStage<Integer> min =
+ new CompletableFuture<Integer>().minimalCompletionStage();
+ stages.add(min);
+ stages.add(min.thenApply(x -> x));
stages.add(CompletableFuture.completedStage(1));
stages.add(CompletableFuture.failedStage(new CFException()));
@@ -4027,6 +4054,131 @@
throw new Error("Methods did not throw UOE: " + bugs);
}
+ /**
+ * minimalStage.toCompletableFuture() returns a CompletableFuture that
+ * is completed normally, with the same value, when source is.
+ */
+ public void testMinimalCompletionStage_toCompletableFuture_normalCompletion() {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ for (Integer v1 : new Integer[] { 1, null })
+ {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ CompletionStage<Integer> minimal = f.minimalCompletionStage();
+ if (!createIncomplete) assertTrue(f.complete(v1));
+ CompletableFuture<Integer> g = minimal.toCompletableFuture();
+ if (createIncomplete) {
+ checkIncomplete(f);
+ checkIncomplete(g);
+ assertTrue(f.complete(v1));
+ }
+ checkCompletedNormally(f, v1);
+ checkCompletedNormally(g, v1);
+ }}
+
+ /**
+ * minimalStage.toCompletableFuture() returns a CompletableFuture that
+ * is completed exceptionally when source is.
+ */
+ public void testMinimalCompletionStage_toCompletableFuture_exceptionalCompletion() {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ {
+ CFException ex = new CFException();
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ CompletionStage<Integer> minimal = f.minimalCompletionStage();
+ if (!createIncomplete) f.completeExceptionally(ex);
+ CompletableFuture<Integer> g = minimal.toCompletableFuture();
+ if (createIncomplete) {
+ checkIncomplete(f);
+ checkIncomplete(g);
+ f.completeExceptionally(ex);
+ }
+ checkCompletedExceptionally(f, ex);
+ checkCompletedWithWrappedException(g, ex);
+ }}
+
+ /**
+ * minimalStage.toCompletableFuture() gives mutable CompletableFuture
+ */
+ public void testMinimalCompletionStage_toCompletableFuture_mutable() {
+ for (Integer v1 : new Integer[] { 1, null })
+ {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ CompletionStage minimal = f.minimalCompletionStage();
+ CompletableFuture<Integer> g = minimal.toCompletableFuture();
+ assertTrue(g.complete(v1));
+ checkCompletedNormally(g, v1);
+ checkIncomplete(f);
+ checkIncomplete(minimal.toCompletableFuture());
+ }}
+
+ /**
+ * minimalStage.toCompletableFuture().join() awaits completion
+ */
+ public void testMinimalCompletionStage_toCompletableFuture_join() throws Exception {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ for (Integer v1 : new Integer[] { 1, null })
+ {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ if (!createIncomplete) assertTrue(f.complete(v1));
+ CompletionStage<Integer> minimal = f.minimalCompletionStage();
+ if (createIncomplete) assertTrue(f.complete(v1));
+ assertEquals(v1, minimal.toCompletableFuture().join());
+ assertEquals(v1, minimal.toCompletableFuture().get());
+ checkCompletedNormally(minimal.toCompletableFuture(), v1);
+ }}
+
+ /**
+ * Completion of a toCompletableFuture copy of a minimal stage
+ * does not complete its source.
+ */
+ public void testMinimalCompletionStage_toCompletableFuture_oneWayPropagation() {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ CompletionStage<Integer> g = f.minimalCompletionStage();
+ assertTrue(g.toCompletableFuture().complete(1));
+ assertTrue(g.toCompletableFuture().complete(null));
+ assertTrue(g.toCompletableFuture().cancel(true));
+ assertTrue(g.toCompletableFuture().cancel(false));
+ assertTrue(g.toCompletableFuture().completeExceptionally(new CFException()));
+ checkIncomplete(g.toCompletableFuture());
+ f.complete(1);
+ checkCompletedNormally(g.toCompletableFuture(), 1);
+ }
+
+ /** Demo utility method for external reliable toCompletableFuture */
+ static <T> CompletableFuture<T> toCompletableFuture(CompletionStage<T> stage) {
+ CompletableFuture<T> f = new CompletableFuture<>();
+ stage.handle((T t, Throwable ex) -> {
+ if (ex != null) f.completeExceptionally(ex);
+ else f.complete(t);
+ return null;
+ });
+ return f;
+ }
+
+ /** Demo utility method to join a CompletionStage */
+ static <T> T join(CompletionStage<T> stage) {
+ return toCompletableFuture(stage).join();
+ }
+
+ /**
+ * Joining a minimal stage "by hand" works
+ */
+ public void testMinimalCompletionStage_join_by_hand() {
+ for (boolean createIncomplete : new boolean[] { true, false })
+ for (Integer v1 : new Integer[] { 1, null })
+ {
+ CompletableFuture<Integer> f = new CompletableFuture<>();
+ CompletionStage<Integer> minimal = f.minimalCompletionStage();
+ CompletableFuture<Integer> g = new CompletableFuture<>();
+ if (!createIncomplete) assertTrue(f.complete(v1));
+ minimal.thenAccept((x) -> g.complete(x));
+ if (createIncomplete) assertTrue(f.complete(v1));
+ g.join();
+ checkCompletedNormally(g, v1);
+ checkCompletedNormally(f, v1);
+ assertEquals(v1, join(minimal));
+ }}
+
static class Monad {
static class ZeroException extends RuntimeException {
public ZeroException() { super("monadic zero"); }
@@ -4317,6 +4469,22 @@
assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
}
+ /**
+ * Checks for garbage retention when MinimalStage.toCompletableFuture()
+ * is invoked many times.
+ * 8161600: Garbage retention when source CompletableFutures are never completed
+ *
+ * As of 2016-07, fails with OOME:
+ * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testToCompletableFutureGarbageRetention tck
+ */
+ public void testToCompletableFutureGarbageRetention() throws Throwable {
+ final int n = expensiveTests ? 900_000 : 10;
+ CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
+ CompletionStage minimal = neverCompleted.minimalCompletionStage();
+ for (int i = 0; i < n; i++)
+ assertTrue(minimal.toCompletableFuture().cancel(true));
+ }
+
// static <U> U join(CompletionStage<U> stage) {
// CompletableFuture<U> f = new CompletableFuture<>();
// stage.whenComplete((v, ex) -> {