# HG changeset patch # User dfuchs # Date 1511368028 0 # Node ID cd5eeec735fbef6c926c5b554de5aff3330f6302 # Parent 89c904d57ebef21ea7270cfdc7dded2e68e90c2a http-client-branch: review comment BodySubscriber::asInputStream diff -r 89c904d57ebe -r cd5eeec735fb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Wed Nov 22 15:32:17 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Wed Nov 22 16:27:08 2017 +0000 @@ -310,6 +310,10 @@ if (delegate != null) unsubscribe(delegate); Runnable cancel = () -> { debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending); + // The connection should be closed, as some data may + // be left over in the stream. + setRetryOnError(false); + onReadError(new IOException("subscription cancelled")); unsubscribe(pending); }; // The subscription created by a delegate is only loosely diff -r 89c904d57ebe -r cd5eeec735fb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 15:32:17 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 16:27:08 2017 +0000 @@ -468,6 +468,8 @@ * been fully received yet. The {@link #body()} method returns an * {@link InputStream} from which the body can be read as it is received. * + * @apiNote See {@link BodySubscriber#asInputStream()} for more information. + * * @return a response body handler */ public static BodyHandler asInputStream() { @@ -702,6 +704,14 @@ * requiring to wait for the entire body to be processed. The response * body can then be read directly from the {@link InputStream}. * + * @apiNote To ensure that all resources associated with the + * corresponding exchange are properly released the caller must + * ensure to either read all bytes until EOF is reached, or call + * {@link InputStream#close} if it is unable or unwilling to do so. + * Calling {@code close} before exhausting the stream may cause + * the underlying HTTP connection to be closed and prevent it + * from being reused for subsequent operations. + * * @return a body subscriber that streams the response body as an * {@link InputStream}. */ diff -r 89c904d57ebe -r cd5eeec735fb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Wed Nov 22 15:32:17 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Wed Nov 22 16:27:08 2017 +0000 @@ -362,7 +362,20 @@ if (!subscribed.compareAndSet(false, true)) { s.cancel(); } else { - this.subscription = s; + // check whether the stream is already closed. + // if so, we should cancel the subscription + // immediately. + boolean closed; + synchronized(this) { + closed = this.closed; + if (!closed) { + this.subscription = s; + } + } + if (closed) { + s.cancel(); + return; + } assert buffers.remainingCapacity() > 1; // should contain at least 2 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " + Math.max(1, buffers.remainingCapacity() - 1)); @@ -411,12 +424,14 @@ @Override public void close() throws IOException { + Flow.Subscription s; synchronized (this) { if (closed) return; closed = true; + s = subscription; + subscription = null; } - Flow.Subscription s = subscription; - subscription = null; + // s will be null if already completed if (s != null) { s.cancel(); } diff -r 89c904d57ebe -r cd5eeec735fb test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java --- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 15:32:17 2017 +0000 +++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Wed Nov 22 16:27:08 2017 +0000 @@ -26,11 +26,18 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.testng.annotations.Test; /* @@ -48,7 +55,8 @@ } /** - * Tests that a client will be unblocked and will + * Tests that a client will be unblocked and will throw an IOException + * if an error occurs while the client is waiting for more data. * @throws InterruptedException * @throws ExecutionException */ @@ -136,4 +144,102 @@ return is == null ? new NullPointerException() : null; } } + + static InputStream close(InputStream is) { + try { + is.close(); + } catch (IOException io) { + throw new CompletionException(io); + } + return is; + } + + @Test + public static void testCloseAndSubscribe() + throws InterruptedException, ExecutionException + { + BodySubscriber isb = BodySubscriber.asInputStream(); + TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription(); + InputStream is = isb.getBody() + .thenApply(HttpResponseInputStreamTest::close) + .toCompletableFuture() + .get(); + isb.onSubscribe(s); + System.out.println(s); + if (!s.cancelled.get()) { + throw new RuntimeException("subscription not cancelled"); + } + if (s.request.get() > 0) { + throw new RuntimeException("subscription has demand"); + } + } + + static byte[] readAllBytes(InputStream is) { + try { + return is.readAllBytes(); + } catch (IOException io) { + throw new CompletionException(io); + } + } + + @Test + public static void testSubscribeAndClose() + throws InterruptedException, ExecutionException + { + BodySubscriber isb = BodySubscriber.asInputStream(); + TestCancelOnCloseSubscription s = new TestCancelOnCloseSubscription(); + InputStream is = isb.getBody().toCompletableFuture().get(); + isb.onSubscribe(s); + if (s.cancelled.get()) { + throw new RuntimeException("subscription cancelled"); + } + CompletableFuture cf = CompletableFuture.supplyAsync( + () -> HttpResponseInputStreamTest.readAllBytes(is)) + .thenApply(String::new); + while (s.request.get() == 0) { + Thread.sleep(100); + } + isb.onNext(List.of(ByteBuffer.wrap("coucou".getBytes()))); + close(is); + System.out.println(s); + if (!s.cancelled.get()) { + throw new RuntimeException("subscription not cancelled"); + } + if (s.request.get() == 0) { + throw new RuntimeException("subscription has no demand"); + } + try { + System.out.println("read " + cf.get() + "!"); + throw new RuntimeException("expected IOException not raised"); + } catch (ExecutionException | CompletionException x) { + if (x.getCause() instanceof IOException) { + System.out.println("Got expected IOException: " + x.getCause()); + } else { + throw x; + } + } + } + + static class TestCancelOnCloseSubscription implements Flow.Subscription { + final AtomicLong request = new AtomicLong(); + final AtomicBoolean cancelled = new AtomicBoolean(); + + @Override + public void request(long n) { + request.addAndGet(n); + } + + @Override + public void cancel() { + cancelled.set(true); + } + + @Override + public String toString() { + return String.format("%s(request=%d, cancelled=%s)", + this.getClass().getSimpleName(), + request.get(), + cancelled.get()); + } + } }