http-client-branch: review comments BodySubscriber should specify how an implementation is expected to behave.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 24 18:30:37 2017 +0000
@@ -29,6 +29,7 @@
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -158,7 +159,8 @@
Executor executor) {
this.return2Cache = return2Cache;
final HttpResponse.BodySubscriber<U> pusher = p;
- final CompletableFuture<U> cf = p.getBody().toCompletableFuture();
+ final CompletionStage<U> bodyCF = p.getBody();
+ final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 24 18:30:37 2017 +0000
@@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import jdk.incubator.http.HttpClient.Version;
@@ -75,22 +76,22 @@
}
private static final class TrailingOperations {
- private final Map<CompletableFuture<?>, Boolean> operations =
+ private final Map<CompletionStage<?>, Boolean> operations =
new IdentityHashMap<>();
- void add(CompletableFuture<?> cf) {
+ void add(CompletionStage<?> cf) {
synchronized(operations) {
cf.whenComplete((r,t)-> remove(cf));
operations.put(cf, Boolean.TRUE);
}
}
- boolean remove(CompletableFuture<?> cf) {
+ boolean remove(CompletionStage<?> cf) {
synchronized(operations) {
return operations.remove(cf);
}
}
}
- final void addTrailingOperation(CompletableFuture<?> cf) {
+ final void addTrailingOperation(CompletionStage<?> cf) {
trailingOperations.add(cf);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Nov 24 18:30:37 2017 +0000
@@ -577,6 +577,18 @@
* completes before the body has been read, because the calling code uses it
* to consume the data.
*
+ * @apiNote To ensure that all resources associated with the
+ * corresponding exchange are properly released, an implementation
+ * of {@code BodySubscriber} must ensure to {@linkplain
+ * Flow.Subscription#request request} more data until {@link
+ * #onComplete() onComplete} or {@link #onError(Throwable) onError}
+ * are signalled, or {@linkplain Flow.Subscription#request cancel} its
+ * {@linkplain #onSubscribe(Flow.Subscription) subscription}
+ * if unable or unwilling to do so.
+ * Calling {@code cancel} before exhausting the data may cause
+ * the underlying HTTP connection to be closed and prevent it
+ * from being reused for subsequent operations.
+ *
* @param <T> the response body type
*/
public interface BodySubscriber<T>
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 24 18:30:37 2017 +0000
@@ -259,9 +259,7 @@
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData() {
- responseBodyCF = responseSubscriber
- .getBody()
- .toCompletableFuture();
+ responseBodyCF = MinimalFuture.of(responseSubscriber.getBody());
if (isCanceled()) {
Throwable t = getCancelCause();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Fri Nov 24 18:30:37 2017 +0000
@@ -157,4 +157,18 @@
public String toString() {
return super.toString() + " (id=" + id +")";
}
+
+ public static <U> MinimalFuture<U> of(CompletionStage<U> stage) {
+ MinimalFuture<U> cf = newMinimalFuture();
+ stage.whenComplete((r,t) -> complete(cf, r, t));
+ return cf;
+ }
+
+ private static <U> void complete(CompletableFuture<U> cf, U result, Throwable t) {
+ if (t == null) {
+ cf.complete(result);
+ } else {
+ cf.completeExceptionally(t);
+ }
+ }
}