# HG changeset patch # User dfuchs # Date 1511548237 0 # Node ID 54f89370f26a848127f3681beaf0ca496294e77b # Parent 5899aa5e18371ce70ec2fba51892f394351e2f06 http-client-branch: review comments BodySubscriber should specify how an implementation is expected to behave. diff -r 5899aa5e1837 -r 54f89370f26a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 24 17:34:57 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 24 18:30:37 2017 +0000 @@ -29,6 +29,7 @@ import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -158,7 +159,8 @@ Executor executor) { this.return2Cache = return2Cache; final HttpResponse.BodySubscriber pusher = p; - final CompletableFuture cf = p.getBody().toCompletableFuture(); + final CompletionStage bodyCF = p.getBody(); + final CompletableFuture cf = MinimalFuture.of(bodyCF); int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); diff -r 5899aa5e1837 -r 54f89370f26a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 24 17:34:57 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 24 18:30:37 2017 +0000 @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Flow; import jdk.incubator.http.HttpClient.Version; @@ -75,22 +76,22 @@ } private static final class TrailingOperations { - private final Map, Boolean> operations = + private final Map, Boolean> operations = new IdentityHashMap<>(); - void add(CompletableFuture cf) { + void add(CompletionStage cf) { synchronized(operations) { cf.whenComplete((r,t)-> remove(cf)); operations.put(cf, Boolean.TRUE); } } - boolean remove(CompletableFuture cf) { + boolean remove(CompletionStage cf) { synchronized(operations) { return operations.remove(cf); } } } - final void addTrailingOperation(CompletableFuture cf) { + final void addTrailingOperation(CompletionStage cf) { trailingOperations.add(cf); } diff -r 5899aa5e1837 -r 54f89370f26a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Nov 24 17:34:57 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Nov 24 18:30:37 2017 +0000 @@ -577,6 +577,18 @@ * completes before the body has been read, because the calling code uses it * to consume the data. * + * @apiNote To ensure that all resources associated with the + * corresponding exchange are properly released, an implementation + * of {@code BodySubscriber} must ensure to {@linkplain + * Flow.Subscription#request request} more data until {@link + * #onComplete() onComplete} or {@link #onError(Throwable) onError} + * are signalled, or {@linkplain Flow.Subscription#request cancel} its + * {@linkplain #onSubscribe(Flow.Subscription) subscription} + * if unable or unwilling to do so. + * Calling {@code cancel} before exhausting the data may cause + * the underlying HTTP connection to be closed and prevent it + * from being reused for subsequent operations. + * * @param the response body type */ public interface BodySubscriber diff -r 5899aa5e1837 -r 54f89370f26a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 24 17:34:57 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 24 18:30:37 2017 +0000 @@ -259,9 +259,7 @@ // pushes entire response body into response subscriber // blocking when required by local or remote flow control CompletableFuture receiveData() { - responseBodyCF = responseSubscriber - .getBody() - .toCompletableFuture(); + responseBodyCF = MinimalFuture.of(responseSubscriber.getBody()); if (isCanceled()) { Throwable t = getCancelCause(); diff -r 5899aa5e1837 -r 54f89370f26a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Fri Nov 24 17:34:57 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Fri Nov 24 18:30:37 2017 +0000 @@ -157,4 +157,18 @@ public String toString() { return super.toString() + " (id=" + id +")"; } + + public static MinimalFuture of(CompletionStage stage) { + MinimalFuture cf = newMinimalFuture(); + stage.whenComplete((r,t) -> complete(cf, r, t)); + return cf; + } + + private static void complete(CompletableFuture cf, U result, Throwable t) { + if (t == null) { + cf.complete(result); + } else { + cf.completeExceptionally(t); + } + } }