--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Wed Nov 22 16:27:08 2017 +0000
@@ -310,6 +310,10 @@
if (delegate != null) unsubscribe(delegate);
Runnable cancel = () -> {
debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
+ // The connection should be closed, as some data may
+ // be left over in the stream.
+ setRetryOnError(false);
+ onReadError(new IOException("subscription cancelled"));
unsubscribe(pending);
};
// The subscription created by a delegate is only loosely
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 16:27:08 2017 +0000
@@ -468,6 +468,8 @@
* been fully received yet. The {@link #body()} method returns an
* {@link InputStream} from which the body can be read as it is received.
*
+ * @apiNote See {@link BodySubscriber#asInputStream()} for more information.
+ *
* @return a response body handler
*/
public static BodyHandler<InputStream> asInputStream() {
@@ -702,6 +704,14 @@
* requiring to wait for the entire body to be processed. The response
* body can then be read directly from the {@link InputStream}.
*
+ * @apiNote To ensure that all resources associated with the
+ * corresponding exchange are properly released the caller must
+ * ensure to either read all bytes until EOF is reached, or call
+ * {@link InputStream#close} if it is unable or unwilling to do so.
+ * Calling {@code close} before exhausting the stream may cause
+ * the underlying HTTP connection to be closed and prevent it
+ * from being reused for subsequent operations.
+ *
* @return a body subscriber that streams the response body as an
* {@link InputStream}.
*/
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Wed Nov 22 15:32:17 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Wed Nov 22 16:27:08 2017 +0000
@@ -362,7 +362,20 @@
if (!subscribed.compareAndSet(false, true)) {
s.cancel();
} else {
- this.subscription = s;
+ // check whether the stream is already closed.
+ // if so, we should cancel the subscription
+ // immediately.
+ boolean closed;
+ synchronized(this) {
+ closed = this.closed;
+ if (!closed) {
+ this.subscription = s;
+ }
+ }
+ if (closed) {
+ s.cancel();
+ return;
+ }
assert buffers.remainingCapacity() > 1; // should contain at least 2
DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+ Math.max(1, buffers.remainingCapacity() - 1));
@@ -411,12 +424,14 @@
@Override
public void close() throws IOException {
+ Flow.Subscription s;
synchronized (this) {
if (closed) return;
closed = true;
+ s = subscription;
+ subscription = null;
}
- Flow.Subscription s = subscription;
- subscription = null;
+ // s will be null if already completed
if (s != null) {
s.cancel();
}
--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 15:32:17 2017 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 16:27:08 2017 +0000
@@ -26,11 +26,18 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.testng.annotations.Test;
/*
@@ -48,7 +55,8 @@
}
/**
- * Tests that a client will be unblocked and will
+ * Tests that a client will be unblocked and will throw an IOException
+ * if an error occurs while the client is waiting for more data.
* @throws InterruptedException
* @throws ExecutionException
*/
@@ -136,4 +144,102 @@
return is == null ? new NullPointerException() : null;
}
}
+
+ static InputStream close(InputStream is) {
+ try {
+ is.close();
+ } catch (IOException io) {
+ throw new CompletionException(io);
+ }
+ return is;
+ }
+
+ @Test
+ public static void testCloseAndSubscribe()
+ throws InterruptedException, ExecutionException
+ {
+ BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+ TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+ InputStream is = isb.getBody()
+ .thenApply(HttpResponseInputStreamTest::close)
+ .toCompletableFuture()
+ .get();
+ isb.onSubscribe(s);
+ System.out.println(s);
+ if (!s.cancelled.get()) {
+ throw new RuntimeException("subscription not cancelled");
+ }
+ if (s.request.get() > 0) {
+ throw new RuntimeException("subscription has demand");
+ }
+ }
+
+ static byte[] readAllBytes(InputStream is) {
+ try {
+ return is.readAllBytes();
+ } catch (IOException io) {
+ throw new CompletionException(io);
+ }
+ }
+
+ @Test
+ public static void testSubscribeAndClose()
+ throws InterruptedException, ExecutionException
+ {
+ BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+ TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription();
+ InputStream is = isb.getBody().toCompletableFuture().get();
+ isb.onSubscribe(s);
+ if (s.cancelled.get()) {
+ throw new RuntimeException("subscription cancelled");
+ }
+ CompletableFuture<String> cf = CompletableFuture.supplyAsync(
+ () -> HttpResponseInputStreamTest.readAllBytes(is))
+ .thenApply(String::new);
+ while (s.request.get() == 0) {
+ Thread.sleep(100);
+ }
+ isb.onNext(List.of(ByteBuffer.wrap("coucou".getBytes())));
+ close(is);
+ System.out.println(s);
+ if (!s.cancelled.get()) {
+ throw new RuntimeException("subscription not cancelled");
+ }
+ if (s.request.get() == 0) {
+ throw new RuntimeException("subscription has no demand");
+ }
+ try {
+ System.out.println("read " + cf.get() + "!");
+ throw new RuntimeException("expected IOException not raised");
+ } catch (ExecutionException | CompletionException x) {
+ if (x.getCause() instanceof IOException) {
+ System.out.println("Got expected IOException: " + x.getCause());
+ } else {
+ throw x;
+ }
+ }
+ }
+
+ static class TestCancelOnCloseSubscription implements Flow.Subscription {
+ final AtomicLong request = new AtomicLong();
+ final AtomicBoolean cancelled = new AtomicBoolean();
+
+ @Override
+ public void request(long n) {
+ request.addAndGet(n);
+ }
+
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(request=%d, cancelled=%s)",
+ this.getClass().getSimpleName(),
+ request.get(),
+ cancelled.get());
+ }
+ }
}