http-client-branch: review comments BodySubscriber should specify how an implementation is expected to behave. http-client-branch
authordfuchs
Fri, 24 Nov 2017 18:30:37 +0000
branchhttp-client-branch
changeset 55869 54f89370f26a
parent 55868 5899aa5e1837
child 55870 1a13210ce726
http-client-branch: review comments BodySubscriber should specify how an implementation is expected to behave.
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Fri Nov 24 18:30:37 2017 +0000
@@ -29,6 +29,7 @@
 import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -158,7 +159,8 @@
                                          Executor executor) {
         this.return2Cache = return2Cache;
         final HttpResponse.BodySubscriber<U> pusher = p;
-        final CompletableFuture<U> cf = p.getBody().toCompletableFuture();
+        final CompletionStage<U> bodyCF = p.getBody();
+        final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
 
         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Fri Nov 24 18:30:37 2017 +0000
@@ -35,6 +35,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Flow;
 import jdk.incubator.http.HttpClient.Version;
@@ -75,22 +76,22 @@
     }
 
     private static final class TrailingOperations {
-        private final Map<CompletableFuture<?>, Boolean> operations =
+        private final Map<CompletionStage<?>, Boolean> operations =
                 new IdentityHashMap<>();
-        void add(CompletableFuture<?> cf) {
+        void add(CompletionStage<?> cf) {
             synchronized(operations) {
                 cf.whenComplete((r,t)-> remove(cf));
                 operations.put(cf, Boolean.TRUE);
             }
         }
-        boolean remove(CompletableFuture<?> cf) {
+        boolean remove(CompletionStage<?> cf) {
             synchronized(operations) {
                 return operations.remove(cf);
             }
         }
     }
 
-    final void addTrailingOperation(CompletableFuture<?> cf) {
+    final void addTrailingOperation(CompletionStage<?> cf) {
         trailingOperations.add(cf);
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Nov 24 18:30:37 2017 +0000
@@ -577,6 +577,18 @@
      * completes before the body has been read, because the calling code uses it
      * to consume the data.
      *
+     * @apiNote To ensure that all resources associated with the
+     * corresponding exchange are properly released, an implementation
+     * of {@code BodySubscriber} must ensure to {@linkplain
+     * Flow.Subscription#request request} more data until {@link
+     * #onComplete() onComplete} or {@link #onError(Throwable) onError}
+     * are signalled, or {@linkplain Flow.Subscription#request cancel} its
+     * {@linkplain #onSubscribe(Flow.Subscription) subscription}
+     * if unable or unwilling to do so.
+     * Calling {@code cancel} before exhausting the data may cause
+     * the underlying HTTP connection to be closed and prevent it
+     * from being reused for subsequent operations.
+     *
      * @param <T> the response body type
      */
     public interface BodySubscriber<T>
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Nov 24 18:30:37 2017 +0000
@@ -259,9 +259,7 @@
     // pushes entire response body into response subscriber
     // blocking when required by local or remote flow control
     CompletableFuture<T> receiveData() {
-        responseBodyCF = responseSubscriber
-                .getBody()
-                .toCompletableFuture();
+        responseBodyCF = MinimalFuture.of(responseSubscriber.getBody());
 
         if (isCanceled()) {
             Throwable t = getCancelCause();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java	Fri Nov 24 17:34:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java	Fri Nov 24 18:30:37 2017 +0000
@@ -157,4 +157,18 @@
     public String toString() {
         return super.toString() + " (id=" + id +")";
     }
+
+    public static <U> MinimalFuture<U> of(CompletionStage<U> stage) {
+        MinimalFuture<U> cf = newMinimalFuture();
+        stage.whenComplete((r,t) -> complete(cf, r, t));
+        return cf;
+    }
+
+    private static <U> void complete(CompletableFuture<U> cf, U result, Throwable t) {
+        if (t == null) {
+            cf.complete(result);
+        } else {
+            cf.completeExceptionally(t);
+        }
+    }
 }