test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java
branchhttp-client-branch
changeset 55858 cd5eeec735fb
parent 55776 9950bc2ee874
child 55942 8d4770c22b63
--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Wed Nov 22 15:32:17 2017 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Wed Nov 22 16:27:08 2017 +0000
@@ -26,11 +26,18 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.testng.annotations.Test;
 
 /*
@@ -48,7 +55,8 @@
     }
 
     /**
-     * Tests that a client will be unblocked and will
+     * Tests that a client will be unblocked and will throw an IOException
+     * if an error occurs while the client is waiting for more data.
      * @throws InterruptedException
      * @throws ExecutionException
      */
@@ -136,4 +144,102 @@
             return is == null ? new NullPointerException() : null;
         }
     }
+
+    static InputStream close(InputStream is) {
+        try {
+            is.close();
+        } catch (IOException io) {
+            throw new CompletionException(io);
+        }
+        return is;
+    }
+
+    @Test
+    public static void testCloseAndSubscribe()
+            throws InterruptedException, ExecutionException
+    {
+        BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+        TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+        InputStream is = isb.getBody()
+                .thenApply(HttpResponseInputStreamTest::close)
+                .toCompletableFuture()
+                .get();
+        isb.onSubscribe(s);
+        System.out.println(s);
+        if (!s.cancelled.get()) {
+            throw new RuntimeException("subscription not cancelled");
+        }
+        if (s.request.get() > 0) {
+            throw new RuntimeException("subscription has demand");
+        }
+    }
+
+    static byte[] readAllBytes(InputStream is) {
+        try {
+            return is.readAllBytes();
+        } catch (IOException io) {
+            throw new CompletionException(io);
+        }
+    }
+
+    @Test
+    public static void testSubscribeAndClose()
+            throws InterruptedException, ExecutionException
+    {
+        BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+        TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+        InputStream is = isb.getBody().toCompletableFuture().get();
+        isb.onSubscribe(s);
+        if (s.cancelled.get()) {
+            throw new RuntimeException("subscription cancelled");
+        }
+        CompletableFuture<String> cf = CompletableFuture.supplyAsync(
+                () -> HttpResponseInputStreamTest.readAllBytes(is))
+                .thenApply(String::new);
+        while (s.request.get() == 0) {
+            Thread.sleep(100);
+        }
+        isb.onNext(List.of(ByteBuffer.wrap("coucou".getBytes())));
+        close(is);
+        System.out.println(s);
+        if (!s.cancelled.get()) {
+            throw new RuntimeException("subscription not cancelled");
+        }
+        if (s.request.get() == 0) {
+            throw new RuntimeException("subscription has no demand");
+        }
+        try {
+            System.out.println("read " + cf.get() + "!");
+            throw new RuntimeException("expected IOException not raised");
+        } catch (ExecutionException | CompletionException x) {
+            if (x.getCause() instanceof IOException) {
+                System.out.println("Got expected IOException: " + x.getCause());
+            } else {
+                throw x;
+            }
+        }
+    }
+
+    static class TestCancelOnCloseSubscription implements Flow.Subscription {
+        final AtomicLong request = new AtomicLong();
+        final AtomicBoolean cancelled = new AtomicBoolean();
+
+        @Override
+        public void request(long n) {
+            request.addAndGet(n);
+        }
+
+        @Override
+        public void cancel() {
+            cancelled.set(true);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s(request=%d, cancelled=%s)",
+                    this.getClass().getSimpleName(),
+                    request.get(),
+                    cancelled.get());
+        }
+    }
 }