http-client-branch: review comment BodySubscriber::asInputStream http-client-branch
authordfuchs
Wed, 22 Nov 2017 16:27:08 +0000
branchhttp-client-branch
changeset 55858 cd5eeec735fb
parent 55857 89c904d57ebe
child 55859 4ca3e578b9c4
http-client-branch: review comment BodySubscriber::asInputStream
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java	Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java	Wed Nov 22 16:27:08 2017 +0000
@@ -310,6 +310,10 @@
             if (delegate != null) unsubscribe(delegate);
             Runnable cancel = () -> {
                 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
+                // The connection should be closed, as some data may
+                // be left over in the stream.
+                setRetryOnError(false);
+                onReadError(new IOException("subscription cancelled"));
                 unsubscribe(pending);
             };
             // The subscription created by a delegate is only loosely
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Wed Nov 22 16:27:08 2017 +0000
@@ -468,6 +468,8 @@
          * been fully received yet. The {@link #body()} method returns an
          * {@link InputStream} from which the body can be read as it is received.
          *
+         * @apiNote See {@link BodySubscriber#asInputStream()} for more information.
+         *
          * @return a response body handler
          */
         public static BodyHandler<InputStream> asInputStream() {
@@ -702,6 +704,14 @@
          * requiring to wait for the entire body to be processed. The response
          * body can then be read directly from the {@link InputStream}.
          *
+         * @apiNote To ensure that all resources associated with the
+         * corresponding exchange are properly released the caller must
+         * ensure to either read all bytes until EOF is reached, or call
+         * {@link InputStream#close} if it is unable or unwilling to do so.
+         * Calling {@code close} before exhausting the stream may cause
+         * the underlying HTTP connection to be closed and prevent it
+         * from being reused for subsequent operations.
+         *
          * @return a body subscriber that streams the response body as an
          *         {@link InputStream}.
          */
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Wed Nov 22 16:27:08 2017 +0000
@@ -362,7 +362,20 @@
             if (!subscribed.compareAndSet(false, true)) {
                 s.cancel();
             } else {
-                this.subscription = s;
+                // check whether the stream is already closed.
+                // if so, we should cancel the subscription
+                // immediately.
+                boolean closed;
+                synchronized(this) {
+                    closed = this.closed;
+                    if (!closed) {
+                        this.subscription = s;
+                    }
+                }
+                if (closed) {
+                    s.cancel();
+                    return;
+                }
                 assert buffers.remainingCapacity() > 1; // should contain at least 2
                 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
                         + Math.max(1, buffers.remainingCapacity() - 1));
@@ -411,12 +424,14 @@
 
         @Override
         public void close() throws IOException {
+            Flow.Subscription s;
             synchronized (this) {
                 if (closed) return;
                 closed = true;
+                s = subscription;
+                subscription = null;
             }
-            Flow.Subscription s = subscription;
-            subscription = null;
+            // s will be null if already completed
             if (s != null) {
                  s.cancel();
             }
--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Wed Nov 22 15:32:17 2017 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Wed Nov 22 16:27:08 2017 +0000
@@ -26,11 +26,18 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.testng.annotations.Test;
 
 /*
@@ -48,7 +55,8 @@
     }
 
     /**
-     * Tests that a client will be unblocked and will
+     * Tests that a client will be unblocked and will throw an IOException
+     * if an error occurs while the client is waiting for more data.
      * @throws InterruptedException
      * @throws ExecutionException
      */
@@ -136,4 +144,102 @@
             return is == null ? new NullPointerException() : null;
         }
     }
+
+    static InputStream close(InputStream is) {
+        try {
+            is.close();
+        } catch (IOException io) {
+            throw new CompletionException(io);
+        }
+        return is;
+    }
+
+    @Test
+    public static void testCloseAndSubscribe()
+            throws InterruptedException, ExecutionException
+    {
+        BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+        TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+        InputStream is = isb.getBody()
+                .thenApply(HttpResponseInputStreamTest::close)
+                .toCompletableFuture()
+                .get();
+        isb.onSubscribe(s);
+        System.out.println(s);
+        if (!s.cancelled.get()) {
+            throw new RuntimeException("subscription not cancelled");
+        }
+        if (s.request.get() > 0) {
+            throw new RuntimeException("subscription has demand");
+        }
+    }
+
+    static byte[] readAllBytes(InputStream is) {
+        try {
+            return is.readAllBytes();
+        } catch (IOException io) {
+            throw new CompletionException(io);
+        }
+    }
+
+    @Test
+    public static void testSubscribeAndClose()
+            throws InterruptedException, ExecutionException
+    {
+        BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+        TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+        InputStream is = isb.getBody().toCompletableFuture().get();
+        isb.onSubscribe(s);
+        if (s.cancelled.get()) {
+            throw new RuntimeException("subscription cancelled");
+        }
+        CompletableFuture<String> cf = CompletableFuture.supplyAsync(
+                () -> HttpResponseInputStreamTest.readAllBytes(is))
+                .thenApply(String::new);
+        while (s.request.get() == 0) {
+            Thread.sleep(100);
+        }
+        isb.onNext(List.of(ByteBuffer.wrap("coucou".getBytes())));
+        close(is);
+        System.out.println(s);
+        if (!s.cancelled.get()) {
+            throw new RuntimeException("subscription not cancelled");
+        }
+        if (s.request.get() == 0) {
+            throw new RuntimeException("subscription has no demand");
+        }
+        try {
+            System.out.println("read " + cf.get() + "!");
+            throw new RuntimeException("expected IOException not raised");
+        } catch (ExecutionException | CompletionException x) {
+            if (x.getCause() instanceof IOException) {
+                System.out.println("Got expected IOException: " + x.getCause());
+            } else {
+                throw x;
+            }
+        }
+    }
+
+    static class TestCancelOnCloseSubscription implements Flow.Subscription {
+        final AtomicLong request = new AtomicLong();
+        final AtomicBoolean cancelled = new AtomicBoolean();
+
+        @Override
+        public void request(long n) {
+            request.addAndGet(n);
+        }
+
+        @Override
+        public void cancel() {
+            cancelled.set(true);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s(request=%d, cancelled=%s)",
+                    this.getClass().getSimpleName(),
+                    request.get(),
+                    cancelled.get());
+        }
+    }
 }