http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part I: HTTP/1.1) http-client-branch
authordfuchs
Fri, 06 Apr 2018 17:04:25 +0100
branchhttp-client-branch
changeset 56392 9120556e7163
parent 56391 a44e40b6aa43
child 56394 1c37e7cb0d4c
http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part I: HTTP/1.1)
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java
test/jdk/java/net/httpclient/StreamingBody.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Fri Apr 06 16:05:55 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Fri Apr 06 17:04:25 2018 +0100
@@ -39,7 +39,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
 import jdk.internal.net.http.common.SequentialScheduler;
@@ -124,21 +124,34 @@
             extends AbstractSubscription
     {
         private final Runnable onCancel;
+        private final Consumer<Throwable> onError;
         private final SequentialScheduler scheduler;
+        private volatile boolean cancelled;
         Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
-                                       Runnable onCancel) {
+                                       Runnable onCancel,
+                                       Consumer<Throwable> onError) {
             this.scheduler = scheduler;
             this.onCancel = onCancel;
+            this.onError = onError;
         }
         @Override
         public void request(long n) {
-            final Demand demand = demand();
-            if (demand.increase(n)) {
-                scheduler.runOrSchedule();
+            if (cancelled) return;
+            try {
+                final Demand demand = demand();
+                if (demand.increase(n)) {
+                    scheduler.runOrSchedule();
+                }
+            } catch (IllegalArgumentException x) {
+                cancelled = true;
+                onError.accept(x);
             }
         }
         @Override
-        public void cancel() { onCancel.run();}
+        public void cancel() {
+            cancelled = true;
+            onCancel.run();
+        }
     }
 
     private final ConcurrentLinkedDeque<ByteBuffer> queue
@@ -158,6 +171,7 @@
     // Only used for checking whether we run on the selector manager thread.
     private final HttpClientImpl client;
     private boolean retry;
+    private volatile boolean stopRequested;
 
     public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
         this.pendingDelegateRef = new AtomicReference<>();
@@ -184,7 +198,7 @@
             handlePendingDelegate();
 
             // Then start emptying the queue, if possible.
-            while ((buf = queue.peek()) != null) {
+            while ((buf = queue.peek()) != null && !stopRequested) {
                 Http1AsyncDelegate delegate = this.delegate;
                 debug.log(Level.DEBUG, "Got %s bytes for delegate %s",
                                        buf.remaining(), delegate);
@@ -219,7 +233,7 @@
                 // removed parsed buffer from queue, and continue with next
                 // if available
                 ByteBuffer parsed = queue.remove();
-                canRequestMore.set(queue.isEmpty());
+                canRequestMore.set(queue.isEmpty() && !stopRequested);
                 assert parsed == buf;
             }
 
@@ -252,7 +266,7 @@
         Http1AsyncDelegate delegate = pendingDelegateRef.get();
         if (delegate == null) delegate = this.delegate;
         Throwable x = error;
-        if (delegate != null && x != null && queue.isEmpty()) {
+        if (delegate != null && x != null && (stopRequested || queue.isEmpty())) {
             // forward error only after emptying the queue.
             final Object captured = delegate;
             debug.log(Level.DEBUG, () -> "flushing " + x
@@ -260,6 +274,16 @@
                     + "\t\t queue.isEmpty: " + queue.isEmpty());
             scheduler.stop();
             delegate.onReadError(x);
+            if (stopRequested) {
+                // This is the special case where the subscriber
+                // has requested an illegal number of items.
+                // In this case, the error doesn't come from
+                // upstream, but from downstream, and we need to
+                // close the upstream connection.
+                Http1Exchange<?> exchg = owner;
+                stop();
+                if (exchg != null) exchg.connection().close();
+            }
         }
     }
 
@@ -308,6 +332,11 @@
         if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
             Http1AsyncDelegate delegate = this.delegate;
             if (delegate != null) unsubscribe(delegate);
+            Consumer<Throwable> onIllegalArg = (x) -> {
+                setRetryOnError(false);
+                stopRequested = true;
+                onReadError(x);
+            };
             Runnable cancel = () -> {
                 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
                 // The connection should be closed, as some data may
@@ -327,7 +356,7 @@
             // the header/body parser work with a flow of ByteBuffer, whereas
             // we have a flow List<ByteBuffer> upstream.
             Http1AsyncDelegateSubscription subscription =
-                    new Http1AsyncDelegateSubscription(scheduler, cancel);
+                    new Http1AsyncDelegateSubscription(scheduler, cancel, onIllegalArg);
             pending.onSubscribe(subscription);
             this.delegate = delegate = pending;
             final Object captured = delegate;
@@ -430,7 +459,7 @@
                     + "\n\t delegate: " + delegate
                     + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
         }
-        if (queue.isEmpty() || pendingDelegateRef.get() != null) {
+        if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) {
             // This callback is called from within the selector thread.
             // Use an executor here to avoid doing the heavy lifting in the
             // selector.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Apr 06 16:05:55 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Apr 06 17:04:25 2018 +0100
@@ -27,10 +27,16 @@
 
 import java.io.EOFException;
 import java.lang.System.Logger.Level;
+import java.net.http.HttpClient;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.net.http.HttpHeaders;
@@ -39,6 +45,8 @@
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.MinimalFuture;
 import jdk.internal.net.http.common.Utils;
+import java.lang.ref.Reference;
+
 import static java.net.http.HttpClient.Version.HTTP_1_1;
 import static java.net.http.HttpResponse.BodySubscribers.discarding;
 
@@ -215,12 +223,113 @@
         }
     }
 
+    static final Flow.Subscription NOP = new Flow.Subscription() {
+        @Override
+        public void request(long n) { }
+        public void cancel() { }
+    };
+
+    /**
+     * The Http1AsyncReceiver ensures that all calls to
+     * the subscriber, including onSubscribe, occur sequentially.
+     * There could however be some race conditions that could happen
+     * in case of unexpected errors thrown at unexpected places, which
+     * may cause onError to be called multiple times.
+     * The Http1BodySubscriber will ensure that the user subscriber
+     * is actually completed only once - and only after it is
+     * subscribed.
+     * @param <U> The type of response.
+     */
+    final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> {
+        final HttpResponse.BodySubscriber<U> userSubscriber;
+        final AtomicBoolean completed = new AtomicBoolean();
+        volatile Throwable withError;
+        volatile boolean subscribed;
+        Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
+            this.userSubscriber = userSubscriber;
+        }
+
+        // propagate the error to the user subscriber, even if not
+        // subscribed yet.
+        private void propagateError(Throwable t) {
+            assert t != null;
+            try {
+                // if unsubscribed at this point, it will not
+                // get subscribed later - so do it now and
+                // propagate the error
+                if (subscribed == false) {
+                    subscribed = true;
+                    userSubscriber.onSubscribe(NOP);
+                }
+            } finally  {
+                // if onError throws then there is nothing to do
+                // here: let the caller deal with it by logging
+                // and closing the connection.
+                userSubscriber.onError(t);
+            }
+        }
+
+        // complete the subscriber, either normally or exceptionally
+        // ensure that the subscriber is completed only once.
+        private void complete(Throwable t) {
+            if (completed.compareAndSet(false, true)) {
+                t  = withError = Utils.getCompletionCause(t);
+                if (t == null) {
+                    assert subscribed;
+                    try {
+                        userSubscriber.onComplete();
+                    } catch (Throwable x) {
+                        propagateError(t = withError = Utils.getCompletionCause(x));
+                        // rethrow and let the caller deal with it.
+                        // (i.e: log and close the connection)
+                        // arguably we could decide to not throw and let the
+                        // connection be reused since we should have received and
+                        // parsed all the bytes when we reach here.
+                        throw x;
+                    }
+                } else {
+                    propagateError(t);
+                }
+            }
+        }
+
+        @Override
+        public CompletionStage<U> getBody() {
+            return userSubscriber.getBody();
+        }
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            if (!subscribed) {
+                subscribed = true;
+                userSubscriber.onSubscribe(subscription);
+            } else {
+                // could be already subscribed and completed
+                // if an unexpected error occurred before the actual
+                // subscription - though that's not supposed
+                // happen.
+                assert completed.get();
+            }
+        }
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            assert !completed.get();
+            userSubscriber.onNext(item);
+        }
+        @Override
+        public void onError(Throwable throwable) {
+            complete(throwable);
+        }
+        @Override
+        public void onComplete() {
+            complete(null);
+        }
+    }
 
     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
                                          boolean return2Cache,
                                          Executor executor) {
         this.return2Cache = return2Cache;
-        final HttpResponse.BodySubscriber<U> pusher = p;
+        final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
 
         final CompletableFuture<U> cf = new MinimalFuture<>();
 
@@ -234,10 +343,13 @@
         headersReader.reset();
         ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
 
+        // We need to keep hold on the client facade until the
+        // tracker has been incremented.
+        connection.client().reference();
         executor.execute(() -> {
             try {
                 content = new ResponseContent(
-                        connection, clen, headers, pusher,
+                        connection, clen, headers, subscriber,
                         this::onFinished
                 );
                 if (cf.isCompletedExceptionally()) {
@@ -253,10 +365,9 @@
                     (t) -> {
                         try {
                             if (t != null) {
-                                pusher.onError(t);
+                                subscriber.onError(t);
                                 connection.close();
-                                if (!cf.isDone())
-                                    cf.completeExceptionally(t);
+                                cf.completeExceptionally(t);
                             }
                         } finally {
                             bodyReader.onComplete(t);
@@ -275,8 +386,8 @@
                                     "Finished reading body: " + s);
                             assert s == State.READING_BODY;
                         }
-                        if (t != null && !cf.isDone()) {
-                            pusher.onError(t);
+                        if (t != null) {
+                            subscriber.onError(t);
                             cf.completeExceptionally(t);
                         }
                     } catch (Throwable x) {
@@ -292,13 +403,13 @@
             } catch (Throwable t) {
                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
                 try {
-                    if (!cf.isDone()) {
-                        pusher.onError(t);
-                        cf.completeExceptionally(t);
-                    }
+                    subscriber.onError(t);
+                    cf.completeExceptionally(t);
                 } finally {
                     asyncReceiver.onReadError(t);
                 }
+            } finally {
+                connection.client().unreference();
             }
         });
         try {
@@ -367,7 +478,7 @@
                 + (cf == null  ? "null"
                 : (cf.isDone() ? "already completed"
                                : "not yet completed")));
-        if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
+        if (cf != null) cf.completeExceptionally(t);
         else { debug.log(Level.DEBUG, "onReadError", t); }
         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
         connection.close();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java	Fri Apr 06 17:04:25 2018 +0100
@@ -0,0 +1,549 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Tests an asynchronous BodySubscriber that completes
+ *          immediately with an InputStream which issues bad
+ *          requests
+ * @library /lib/testlibrary http2/server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ *          java.net.http/jdk.internal.net.http.common
+ *          java.net.http/jdk.internal.net.http.frame
+ *          java.net.http/jdk.internal.net.http.hpack
+ * @run testng/othervm InvalidInputStreamSubscriptionRequest
+ */
+
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandler;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.net.http.HttpResponse.BodySubscriber;
+import java.net.http.HttpResponse.BodySubscribers;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+public class InvalidInputStreamSubscriptionRequest implements HttpServerAdapters {
+
+    SSLContext sslContext;
+    HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
+    HttpTestServer httpsTestServer;   // HTTPS/1.1
+    HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
+    HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
+    String httpURI_fixed;
+    String httpURI_chunk;
+    String httpsURI_fixed;
+    String httpsURI_chunk;
+    String http2URI_fixed;
+    String http2URI_chunk;
+    String https2URI_fixed;
+    String https2URI_chunk;
+
+    static final int ITERATION_COUNT = 3;
+    // a shared executor helps reduce the amount of threads created by the test
+    static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
+
+    static final long start = System.nanoTime();
+    public static String now() {
+        long now = System.nanoTime() - start;
+        long secs = now / 1000_000_000;
+        long mill = (now % 1000_000_000) / 1000_000;
+        long nan = now % 1000_000;
+        return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
+    }
+    static volatile boolean tasksFailed;
+    static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
+
+    static class TestExecutor implements Executor {
+        final AtomicLong tasks = new AtomicLong();
+        Executor executor;
+        TestExecutor(Executor executor) {
+            this.executor = executor;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            long id = tasks.incrementAndGet();
+            executor.execute(() -> {
+                try {
+                    command.run();
+                } catch (Throwable t) {
+                    tasksFailed = true;
+                    System.out.printf(now() + "Task %s failed: %s%n", id, t);
+                    System.err.printf(now() + "Task %s failed: %s%n", id, t);
+                    FAILURES.putIfAbsent("Task " + id, t);
+                    throw t;
+                }
+            });
+        }
+    }
+
+    @AfterClass
+    static final void printFailedTests() {
+        out.println("\n=========================");
+        try {
+            out.println("Failed tasks: ");
+            FAILURES.entrySet().forEach((e) -> {
+                out.printf("\t%s: %s%n", e.getKey(), e.getValue());
+                e.getValue().printStackTrace(out);
+            });
+            if (tasksFailed) {
+                System.out.println("WARNING: Some tasks failed");
+            }
+        } finally {
+            out.println("\n=========================\n");
+        }
+    }
+
+    interface BHS extends Supplier<BodyHandler<InputStream>> {
+        static BHS of(BHS impl, String name) {
+            return new BHSImpl(impl, name);
+        }
+    }
+
+    static final class BHSImpl implements BHS {
+        final BHS supplier;
+        final String name;
+        BHSImpl(BHS impl, String name) {
+            this.supplier = impl;
+            this.name = name;
+        }
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        @Override
+        public BodyHandler<InputStream> get() {
+            return supplier.get();
+        }
+    }
+
+    static final Supplier<BodyHandler<InputStream>> OF_INPUTSTREAM =
+            BHS.of(BodyHandlers::ofInputStream, "BodyHandlers::ofInputStream");
+
+    @DataProvider(name = "variants")
+    public Object[][] variants() {
+        return new Object[][]{
+                { httpURI_fixed,    false, OF_INPUTSTREAM },
+                { httpURI_chunk,    false, OF_INPUTSTREAM },
+                { httpsURI_fixed,   false, OF_INPUTSTREAM },
+                { httpsURI_chunk,   false, OF_INPUTSTREAM },
+//                { http2URI_fixed,   false, OF_INPUTSTREAM },
+//                { http2URI_chunk,   false, OF_INPUTSTREAM },
+//                { https2URI_fixed,  false, OF_INPUTSTREAM },
+//                { https2URI_chunk,  false, OF_INPUTSTREAM },
+
+                { httpURI_fixed,    true, OF_INPUTSTREAM },
+                { httpURI_chunk,    true, OF_INPUTSTREAM },
+                { httpsURI_fixed,   true, OF_INPUTSTREAM },
+                { httpsURI_chunk,   true, OF_INPUTSTREAM },
+//                { http2URI_fixed,   true, OF_INPUTSTREAM },
+//                { http2URI_chunk,   true, OF_INPUTSTREAM },
+//                { https2URI_fixed,  true, OF_INPUTSTREAM },
+//                { https2URI_chunk,  true, OF_INPUTSTREAM },
+        };
+    }
+
+    HttpClient newHttpClient() {
+        return HttpClient.newBuilder()
+                         .executor(executor)
+                         .sslContext(sslContext)
+                         .build();
+    }
+
+    @Test(dataProvider = "variants")
+    public void testNoBody(String uri, boolean sameClient, BHS handlers)
+            throws Exception
+    {
+        HttpClient client = null;
+        for (int i=0; i< ITERATION_COUNT; i++) {
+            if (!sameClient || client == null)
+                client = newHttpClient();
+
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+                    .build();
+            BodyHandler<InputStream> handler = handlers.get();
+            BodyHandler<InputStream> badHandler = (c,h) ->
+                    new BadBodySubscriber<>(handler.apply(c,h));
+            try {
+                HttpResponse<InputStream> response = client.send(req, badHandler);
+                try (InputStream is = response.body()) {
+                    String body = new String(is.readAllBytes(), UTF_8);
+                    assertEquals(body, "");
+                    if (uri.endsWith("/chunk")) {
+                        // with /fixed and 0 length
+                        // there's no need for any call to request()
+                        throw new RuntimeException("Expected IAE not thrown");
+                    }
+                }
+            } catch (Exception x) {
+                Throwable cause = x;
+                if (x instanceof CompletionException || x instanceof ExecutionException) {
+                    cause = x.getCause();
+                }
+                if (cause instanceof IOException && cause.getCause() != null) {
+                    cause = cause.getCause();
+                }
+                if (cause instanceof IllegalArgumentException) {
+                    System.out.println("Got expected exception: " + cause);
+                } else throw x;
+            }
+        }
+    }
+
+    @Test(dataProvider = "variants")
+    public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers)
+            throws Exception
+    {
+        HttpClient client = null;
+        for (int i=0; i< ITERATION_COUNT; i++) {
+            if (!sameClient || client == null)
+                client = newHttpClient();
+
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+                    .build();
+            BodyHandler<InputStream> handler = handlers.get();
+            BodyHandler<InputStream> badHandler = (c,h) ->
+                    new BadBodySubscriber<>(handler.apply(c,h));
+            CompletableFuture<String> result =
+                    client.sendAsync(req, badHandler).thenCompose(
+                            (responsePublisher) -> {
+                                try (InputStream is = responsePublisher.body()) {
+                                    return CompletableFuture.completedFuture(
+                                            new String(is.readAllBytes(), UTF_8));
+                                } catch (Exception x) {
+                                    return CompletableFuture.failedFuture(x);
+                                }
+                            });
+            try {
+                // Get the final result and compare it with the expected body
+                assertEquals(result.get(), "");
+                if (uri.endsWith("/chunk")) {
+                    // with /fixed and 0 length
+                    // there's no need for any call to request()
+                    throw new RuntimeException("Expected IAE not thrown");
+                }
+            } catch (Exception x) {
+                Throwable cause = x;
+                if (x instanceof CompletionException || x instanceof ExecutionException) {
+                    cause = x.getCause();
+                }
+                if (cause instanceof IOException && cause.getCause() != null) {
+                    cause = cause.getCause();
+                }
+                if (cause instanceof IllegalArgumentException) {
+                    System.out.println("Got expected exception: " + cause);
+                } else throw x;
+            }
+        }
+    }
+
+    @Test(dataProvider = "variants")
+    public void testAsString(String uri, boolean sameClient, BHS handlers)
+            throws Exception
+    {
+        HttpClient client = null;
+        for (int i=0; i< ITERATION_COUNT; i++) {
+            if (!sameClient || client == null)
+                client = newHttpClient();
+
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
+                    .build();
+            BodyHandler<InputStream> handler = handlers.get();
+            BodyHandler<InputStream> badHandler = (c,h) ->
+                    new BadBodySubscriber<>(handler.apply(c,h));
+            try {
+                HttpResponse<InputStream> response = client.send(req, badHandler);
+                try (InputStream is = response.body()) {
+                    String body = new String(is.readAllBytes(), UTF_8);
+                    assertEquals(body, WITH_BODY);
+                    throw new RuntimeException("Expected IAE not thrown");
+                }
+            } catch (Exception x) {
+                Throwable cause = x;
+                if (x instanceof CompletionException || x instanceof ExecutionException) {
+                    cause = x.getCause();
+                }
+                if (cause instanceof IOException && cause.getCause() != null) {
+                    cause = cause.getCause();
+                }
+                if (cause instanceof IllegalArgumentException) {
+                    System.out.println("Got expected exception: " + cause);
+                } else throw x;
+            }
+        }
+    }
+
+    @Test(dataProvider = "variants")
+    public void testAsStringAsync(String uri, boolean sameClient, BHS handlers)
+            throws Exception
+    {
+        HttpClient client = null;
+        for (int i=0; i< ITERATION_COUNT; i++) {
+            if (!sameClient || client == null)
+                client = newHttpClient();
+
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
+                    .build();
+            BodyHandler<InputStream> handler = handlers.get();
+            BodyHandler<InputStream> badHandler = (c,h) ->
+                    new BadBodySubscriber<>(handler.apply(c,h));
+            CompletableFuture<String> result = client.sendAsync(req, badHandler)
+                    .thenCompose((responsePublisher) -> {
+                        try (InputStream is = responsePublisher.body()) {
+                            return CompletableFuture.completedFuture(
+                                    new String(is.readAllBytes(), UTF_8));
+                        } catch (Exception x) {
+                            return CompletableFuture.failedFuture(x);
+                        }
+                    });
+            // Get the final result and compare it with the expected body
+            try {
+                String body = result.get();
+                assertEquals(body, WITH_BODY);
+                throw new RuntimeException("Expected IAE not thrown");
+            } catch (Exception x) {
+                Throwable cause = x;
+                if (x instanceof CompletionException || x instanceof ExecutionException) {
+                    cause = x.getCause();
+                }
+                if (cause instanceof IOException && cause.getCause() != null) {
+                    cause = cause.getCause();
+                }
+                if (cause instanceof IllegalArgumentException) {
+                    System.out.println("Got expected exception: " + cause);
+                } else throw x;
+            }
+        }
+    }
+
+    static final class BadSubscription implements Flow.Subscription {
+        Flow.Subscription subscription;
+        Executor executor;
+        BadSubscription(Flow.Subscription subscription) {
+            this.subscription = subscription;
+        }
+
+        @Override
+        public void request(long n) {
+            if (executor == null) {
+                subscription.request(-n);
+            } else {
+                executor.execute(() -> subscription.request(-n));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            subscription.cancel();
+        }
+    }
+
+    static final class BadBodySubscriber<T> implements BodySubscriber<T> {
+        final BodySubscriber<T> subscriber;
+        BadBodySubscriber(BodySubscriber<T> subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        @Override
+        public CompletionStage<T> getBody() {
+            return subscriber.getBody();
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            System.out.println("Subscription is: " + subscription);
+            subscriber.onSubscribe(new BadSubscription(subscription));
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            subscriber.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            subscriber.onError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            subscriber.onComplete();
+        }
+    }
+
+    static String serverAuthority(HttpServer server) {
+        return InetAddress.getLoopbackAddress().getHostName() + ":"
+                + server.getAddress().getPort();
+    }
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        // HTTP/1.1
+        HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
+        HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler();
+        InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+        httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
+        httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed");
+        httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk");
+        httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed";
+        httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk";
+
+        HttpsServer httpsServer = HttpsServer.create(sa, 0);
+        httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+        httpsTestServer = HttpTestServer.of(httpsServer);
+        httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
+        httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
+        httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed";
+        httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk";
+
+        // HTTP/2
+        HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
+        HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler();
+
+        http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
+        http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
+        http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
+        http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
+        http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
+
+        https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
+        https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
+        https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
+        https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
+        https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
+
+        httpTestServer.start();
+        httpsTestServer.start();
+        http2TestServer.start();
+        https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        httpTestServer.stop();
+        httpsTestServer.stop();
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
+            " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
+            " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
+            " exercitation ullamco laboris nisi ut aliquip ex ea" +
+            " commodo consequat. Duis aute irure dolor in reprehenderit in " +
+            "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
+            " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
+            " officia deserunt mollit anim id est laborum.";
+
+    static class HTTP_FixedLengthHandler implements HttpTestHandler {
+        @Override
+        public void handle(HttpTestExchange t) throws IOException {
+            out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            if (t.getRequestURI().getPath().endsWith("/withBody")) {
+                byte[] bytes = WITH_BODY.getBytes(UTF_8);
+                t.sendResponseHeaders(200, bytes.length);  // body
+                try (OutputStream os = t.getResponseBody()) {
+                    os.write(bytes);
+                }
+            } else {
+                t.sendResponseHeaders(200, 0);  //no body
+            }
+        }
+    }
+
+    static class HTTP_VariableLengthHandler implements HttpTestHandler {
+        @Override
+        public void handle(HttpTestExchange t) throws IOException {
+            out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            t.sendResponseHeaders(200, -1);  //chunked or variable
+            if (t.getRequestURI().getPath().endsWith("/withBody")) {
+                byte[] bytes = WITH_BODY.getBytes(UTF_8);
+                try (OutputStream os = t.getResponseBody()) {
+                    int chunkLen = bytes.length/10;
+                    if (chunkLen == 0) {
+                        os.write(bytes);
+                    } else {
+                        int count = 0;
+                        for (int i=0; i<10; i++) {
+                            os.write(bytes, count, chunkLen);
+                            os.flush();
+                            count += chunkLen;
+                        }
+                        os.write(bytes, count, bytes.length % chunkLen);
+                        count += bytes.length % chunkLen;
+                        assert count == bytes.length;
+                    }
+                }
+            } else {
+                t.getResponseBody().close();   // no body
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/StreamingBody.java	Fri Apr 06 17:04:25 2018 +0100
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Exercise a streaming subscriber ( InputStream ) without holding a
+ *          strong (or any ) reference to the client.
+ * @modules java.base/sun.net.www.http
+ *          java.net.http/jdk.internal.net.http.common
+ *          java.net.http/jdk.internal.net.http.frame
+ *          java.net.http/jdk.internal.net.http.hpack
+ *          java.logging
+ *          jdk.httpserver
+ * @library /lib/testlibrary /test/lib http2/server
+ * @build Http2TestServer
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @run testng/othervm
+ *       -Djdk.httpclient.HttpClient.log=trace,headers,requests
+ *       StreamingBody
+ */
+
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import javax.net.ssl.SSLContext;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+public class StreamingBody implements HttpServerAdapters {
+
+    SSLContext sslContext;
+    HttpTestServer httpTestServer;        // HTTP/1.1    [ 4 servers ]
+    HttpTestServer httpsTestServer;       // HTTPS/1.1
+    HttpTestServer http2TestServer;       // HTTP/2 ( h2c )
+    HttpTestServer https2TestServer;      // HTTP/2 ( h2  )
+    String httpURI;
+    String httpsURI;
+    String http2URI;
+    String https2URI;
+
+    static final String MESSAGE = "StreamingBody message body";
+    static final int ITERATIONS = 100;
+
+    @DataProvider(name = "positive")
+    public Object[][] positive() {
+        return new Object[][] {
+                { httpURI,    },
+                { httpsURI,   },
+                { http2URI,   },
+                { https2URI,  },
+        };
+    }
+
+    @Test(dataProvider = "positive")
+    void test(String uriString) throws Exception {
+        out.printf("%n---- starting (%s) ----%n", uriString);
+        URI uri = URI.create(uriString);
+        HttpRequest request = HttpRequest.newBuilder(uri).build();
+
+        for (int i=0; i< ITERATIONS; i++) {
+            out.println("iteration: " + i);
+            HttpResponse<InputStream> response = HttpClient.newBuilder()
+                    .sslContext(sslContext)
+                    .build()
+                    .sendAsync(request, BodyHandlers.ofInputStream())
+                    .join();
+
+            String body = new String(response.body().readAllBytes(), UTF_8);
+            out.println("Got response: " + response);
+            out.println("Got body Path: " + body);
+
+            assertEquals(response.statusCode(), 200);
+            assertEquals(body, MESSAGE);
+        }
+    }
+
+
+    // -- Infrastructure
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+
+        httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
+        httpTestServer.addHandler(new MessageHandler(), "/http1/streamingbody/");
+        httpURI = "http://" + httpTestServer.serverAuthority() + "/http1/streamingbody/w";
+
+        HttpsServer httpsServer = HttpsServer.create(sa, 0);
+        httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+        httpsTestServer = HttpTestServer.of(httpsServer);
+        httpsTestServer.addHandler(new MessageHandler(),"/https1/streamingbody/");
+        httpsURI = "https://" + httpsTestServer.serverAuthority() + "/https1/streamingbody/x";
+
+        http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
+        http2TestServer.addHandler(new MessageHandler(), "/http2/streamingbody/");
+        http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/streamingbody/y";
+        https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
+        https2TestServer.addHandler(new MessageHandler(), "/https2/streamingbody/");
+        https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/streamingbody/z";
+
+        httpTestServer.start();
+        httpsTestServer.start();
+        http2TestServer.start();
+        https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        httpTestServer.stop();
+        httpsTestServer.stop();
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static class MessageHandler implements HttpTestHandler {
+        @Override
+        public void handle(HttpTestExchange t) throws IOException {
+            System.out.println("MessageHandler for: " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody();
+                 OutputStream os = t.getResponseBody()) {
+                is.readAllBytes();
+                byte[] bytes = MESSAGE.getBytes(UTF_8);
+                t.sendResponseHeaders(200, bytes.length);
+                os.write(bytes);
+            }
+        }
+    }
+}