--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 15:32:17 2017 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 16:27:08 2017 +0000
@@ -26,11 +26,18 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.testng.annotations.Test;
/*
@@ -48,7 +55,8 @@
}
/**
- * Tests that a client will be unblocked and will
+ * Tests that a client will be unblocked and will throw an IOException
+ * if an error occurs while the client is waiting for more data.
* @throws InterruptedException
* @throws ExecutionException
*/
@@ -136,4 +144,102 @@
return is == null ? new NullPointerException() : null;
}
}
+
+ static InputStream close(InputStream is) {
+ try {
+ is.close();
+ } catch (IOException io) {
+ throw new CompletionException(io);
+ }
+ return is;
+ }
+
+ @Test
+ public static void testCloseAndSubscribe()
+ throws InterruptedException, ExecutionException
+ {
+ BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+ TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+ InputStream is = isb.getBody()
+ .thenApply(HttpResponseInputStreamTest::close)
+ .toCompletableFuture()
+ .get();
+ isb.onSubscribe(s);
+ System.out.println(s);
+ if (!s.cancelled.get()) {
+ throw new RuntimeException("subscription not cancelled");
+ }
+ if (s.request.get() > 0) {
+ throw new RuntimeException("subscription has demand");
+ }
+ }
+
+ static byte[] readAllBytes(InputStream is) {
+ try {
+ return is.readAllBytes();
+ } catch (IOException io) {
+ throw new CompletionException(io);
+ }
+ }
+
+ @Test
+ public static void testSubscribeAndClose()
+ throws InterruptedException, ExecutionException
+ {
+ BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+ TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+ InputStream is = isb.getBody().toCompletableFuture().get();
+ isb.onSubscribe(s);
+ if (s.cancelled.get()) {
+ throw new RuntimeException("subscription cancelled");
+ }
+ CompletableFuture<String> cf = CompletableFuture.supplyAsync(
+ () -> HttpResponseInputStreamTest.readAllBytes(is))
+ .thenApply(String::new);
+ while (s.request.get() == 0) {
+ Thread.sleep(100);
+ }
+ isb.onNext(List.of(ByteBuffer.wrap("coucou".getBytes())));
+ close(is);
+ System.out.println(s);
+ if (!s.cancelled.get()) {
+ throw new RuntimeException("subscription not cancelled");
+ }
+ if (s.request.get() == 0) {
+ throw new RuntimeException("subscription has no demand");
+ }
+ try {
+ System.out.println("read " + cf.get() + "!");
+ throw new RuntimeException("expected IOException not raised");
+ } catch (ExecutionException | CompletionException x) {
+ if (x.getCause() instanceof IOException) {
+ System.out.println("Got expected IOException: " + x.getCause());
+ } else {
+ throw x;
+ }
+ }
+ }
+
+ static class TestCancelOnCloseSubscription implements Flow.Subscription {
+ final AtomicLong request = new AtomicLong();
+ final AtomicBoolean cancelled = new AtomicBoolean();
+
+ @Override
+ public void request(long n) {
+ request.addAndGet(n);
+ }
+
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(request=%d, cancelled=%s)",
+ this.getClass().getSimpleName(),
+ request.get(),
+ cancelled.get());
+ }
+ }
}