http-client-branch: Add a test for BodySubscribers throwing in getBody() or returning exceptionally completed CFs http-client-branch
authordfuchs
Thu, 22 Feb 2018 17:33:21 +0000
branchhttp-client-branch
changeset 56165 8a6065d830b9
parent 56164 4db4bec0e5bb
child 56166 56c52d6417d1
http-client-branch: Add a test for BodySubscribers throwing in getBody() or returning exceptionally completed CFs
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
test/jdk/java/net/httpclient/DigestEchoClient.java
test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java
test/jdk/java/net/httpclient/HttpServerAdapters.java
test/jdk/java/net/httpclient/ShortRequestBody.java
test/jdk/java/net/httpclient/ThrowingSubscribers.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Thu Feb 22 17:33:21 2018 +0000
@@ -477,9 +477,9 @@
             if (dp.throwable != null) {
                 state = State.ERROR;
                 exec.execute(() -> {
-                    connection.close();
                     headersSentCF.completeExceptionally(dp.throwable);
                     bodySentCF.completeExceptionally(dp.throwable);
+                    connection.close();
                 });
                 return dp;
             }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Thu Feb 22 17:33:21 2018 +0000
@@ -29,9 +29,8 @@
 import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.net.http.HttpHeaders;
@@ -67,8 +66,9 @@
     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
     private volatile State readProgress = State.INITIAL;
     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
-    final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
-
+    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+    final static AtomicLong responseCount = new AtomicLong();
+    final long id = responseCount.incrementAndGet();
 
     Http1Response(HttpConnection conn,
                   Http1Exchange<T> exchange,
@@ -82,6 +82,56 @@
         bodyReader = new BodyReader(this::advance);
     }
 
+    String dbgTag;
+    private String dbgString() {
+        String dbg = dbgTag;
+        if (dbg == null) {
+            String cdbg = connection.dbgTag;
+            if (cdbg != null) {
+                dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
+            } else {
+                dbg = "Http1Response(id=" + id + ")";
+            }
+        }
+        return dbg;
+    }
+
+    // The ClientRefCountTracker is used to track the state
+    // of a pending operation. Altough there usually is a single
+    // point where the operation starts, it may terminate at
+    // different places.
+    private final class ClientRefCountTracker {
+        final HttpClientImpl client = connection.client();
+        // state & 0x01 != 0 => acquire called
+        // state & 0x02 != 0 => tryRelease called
+        byte state;
+
+        public synchronized void acquire() {
+            if (state == 0) {
+                // increment the reference count on the HttpClientImpl
+                // to prevent the SelectorManager thread from exiting
+                // until our operation is complete.
+                debug.log(Level.DEBUG, "incrementing ref count for %s", client);
+                client.reference();
+                state = 0x01;
+            } else {
+                assert (state & 0x01) == 0 : "reference count already incremented";
+            }
+        }
+
+        public synchronized void tryRelease() {
+            if (state == 0x01) {
+                // decrement the reference count on the HttpClientImpl
+                // to allow the SelectorManager thread to exit if no
+                // other operation is pending and the facade is no
+                // longer referenced.
+                debug.log(Level.DEBUG, "decrementing ref count for %s", client);
+                client.unreference();
+                state |= 0x02;
+            }
+        }
+    }
+
    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
                 + asyncReceiver.remaining() +") "  + readProgress);
@@ -157,6 +207,7 @@
         }
     }
 
+
     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
                                          boolean return2Cache,
                                          Executor executor) {
@@ -173,10 +224,10 @@
         // if we reach here, we must reset the headersReader state.
         asyncReceiver.unsubscribe(headersReader);
         headersReader.reset();
+        ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
 
         executor.execute(() -> {
             try {
-                HttpClientImpl client = connection.client();
                 content = new ResponseContent(
                         connection, clen, headers, pusher,
                         this::onFinished
@@ -189,7 +240,7 @@
                 // increment the reference count on the HttpClientImpl
                 // to prevent the SelectorManager thread from exiting until
                 // the body is fully read.
-                client.reference();
+                refCountTracker.acquire();
                 bodyReader.start(content.getBodyParser(
                     (t) -> {
                         try {
@@ -200,11 +251,6 @@
                                     cf.completeExceptionally(t);
                             }
                         } finally {
-                            // decrement the reference count on the HttpClientImpl
-                            // to allow the SelectorManager thread to exit if no
-                            // other operation is pending and the facade is no
-                            // longer referenced.
-                            client.unreference();
                             bodyReader.onComplete(t);
                         }
                     }));
@@ -216,7 +262,7 @@
                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
                     t = Utils.getCompletionCause(t);
                     try {
-                        if (t != null) {
+                        if (t == null) {
                             debug.log(Level.DEBUG, () ->
                                     "Finished reading body: " + s);
                             assert s == State.READING_BODY;
@@ -228,6 +274,10 @@
                     } catch (Throwable x) {
                         // not supposed to happen
                         asyncReceiver.onReadError(x);
+                    } finally {
+                        // we're done: release the ref count for
+                        // the current operation.
+                        refCountTracker.tryRelease();
                     }
                 });
                 connection.addTrailingOperation(trailingOp);
@@ -243,14 +293,31 @@
                 }
             }
         });
-        p.getBody().whenComplete((U u, Throwable t) -> {
-            if (t == null)
-                cf.complete(u);
-            else
-                cf.completeExceptionally(t);
+        try {
+            p.getBody().whenComplete((U u, Throwable t) -> {
+                if (t == null)
+                    cf.complete(u);
+                else
+                    cf.completeExceptionally(t);
+            });
+        } catch (Throwable t) {
+            cf.completeExceptionally(t);
+            asyncReceiver.setRetryOnError(false);
+            asyncReceiver.onReadError(t);
+        }
+
+        return cf.whenComplete((s,t) -> {
+            if (t != null) {
+                // If an exception occurred, release the
+                // ref count for the current operation, as
+                // it may never be triggered otherwise
+                // (BodySubscriber ofInputStream)
+                // If there was no exception then the
+                // ref count will be/have been released when
+                // the last byte of the response is/was received
+                refCountTracker.tryRelease();
+            }
         });
-
-        return cf;
     }
 
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Feb 22 17:33:21 2018 +0000
@@ -520,9 +520,17 @@
         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
             if (Thread.currentThread() == this) {
                 SelectionKey key = e.channel().keyFor(selector);
-                if (key != null) {
+                if (key != null && key.isValid()) {
                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
-                    if (sa != null) sa.register(e);
+                    sa.register(e);
+                } else if (e.interestOps() != 0){
+                    // We don't care about paused events.
+                    // These are actually handled by
+                    // SelectorAttachment::resetInterestOps later on.
+                    // But if we reach here when trying to resume an
+                    // event then it's better to fail fast.
+                    debug.log(Level.DEBUG, "No key for channel");
+                    e.abort(new IOException("No key for channel"));
                 }
             } else {
                 register(e);
@@ -563,11 +571,13 @@
         public void run() {
             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
             List<AsyncEvent> readyList = new ArrayList<>();
+            List<Runnable> resetList = new ArrayList<>();
             try {
                 while (!Thread.currentThread().isInterrupted()) {
                     synchronized (this) {
                         assert errorList.isEmpty();
                         assert readyList.isEmpty();
+                        assert resetList.isEmpty();
                         for (AsyncEvent event : registrations) {
                             if (event instanceof AsyncTriggerEvent) {
                                 readyList.add(event);
@@ -673,9 +683,10 @@
                         owner.purgeTimeoutsAndReturnNextDeadline();
                         continue;
                     }
-                    Set<SelectionKey> keys = selector.selectedKeys();
 
+                    Set<SelectionKey> keys = selector.selectedKeys();
                     assert errorList.isEmpty();
+
                     for (SelectionKey key : keys) {
                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
                         if (!key.isValid()) {
@@ -697,17 +708,24 @@
                             continue;
                         }
                         sa.events(eventsOccurred).forEach(readyList::add);
-                        sa.resetInterestOps(eventsOccurred);
+                        resetList.add(() -> sa.resetInterestOps(eventsOccurred));
                     }
+
                     selector.selectNow(); // complete cancellation
                     selector.selectedKeys().clear();
 
-                    for (AsyncEvent event : readyList) {
-                        handleEvent(event, null); // will be delegated to executor
-                    }
+                    // handle selected events
+                    readyList.forEach((e) -> handleEvent(e, null));
                     readyList.clear();
+
+                    // handle errors (closed channels etc...)
                     errorList.forEach((p) -> handleEvent(p.first, p.second));
                     errorList.clear();
+
+                    // reset interest ops for selected channels
+                    resetList.forEach(r -> r.run());
+                    resetList.clear();
+
                 }
             } catch (Throwable e) {
                 //e.printStackTrace();
@@ -746,6 +764,20 @@
         }
     }
 
+    final String debugInterestOps(SelectableChannel channel) {
+        try {
+            SelectionKey key = channel.keyFor(selmgr.selector);
+            if (key == null) return "channel not registered with selector";
+            String keyInterestOps = key.isValid()
+                    ? "key.interestOps=" + key.interestOps() : "invalid key";
+            return String.format("channel registered with selector, %s, sa.interestOps=%s",
+                                 keyInterestOps,
+                                 ((SelectorAttachment)key.attachment()).interestOps);
+        } catch (Throwable t) {
+            return String.valueOf(t);
+        }
+    }
+
     /**
      * Tracks multiple user level registrations associated with one NIO
      * registration (SelectionKey). In this implementation, registrations
@@ -772,6 +804,9 @@
 
         void register(AsyncEvent e) throws ClosedChannelException {
             int newOps = e.interestOps();
+            // re register interest if we are not already interested
+            // in the event. If the event is paused, then the pause will
+            // be taken into account later when resetInterestOps is called.
             boolean reRegister = (interestOps & newOps) != newOps;
             interestOps |= newOps;
             pending.add(e);
@@ -779,9 +814,11 @@
                 // first time registration happens here also
                 try {
                     chan.register(selector, interestOps, this);
-                } catch (CancelledKeyException x) {
+                } catch (Throwable x) {
                     abortPending(x);
                 }
+            } else if (!chan.isOpen()) {
+                abortPending(new ClosedChannelException());
             }
         }
 
@@ -818,11 +855,20 @@
 
             this.interestOps = newOps;
             SelectionKey key = chan.keyFor(selector);
-            if (newOps == 0 && pending.isEmpty()) {
+            if (newOps == 0 && key != null && pending.isEmpty()) {
                 key.cancel();
             } else {
                 try {
+                    if (key == null || !key.isValid()) {
+                        throw new CancelledKeyException();
+                    }
                     key.interestOps(newOps);
+                    // double check after
+                    if (!chan.isOpen()) {
+                        abortPending(new ClosedChannelException());
+                        return;
+                    }
+                    assert key.interestOps() == newOps;
                 } catch (CancelledKeyException x) {
                     // channel may have been closed
                     debug.log(Level.DEBUG, "key cancelled for " + chan);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java	Thu Feb 22 17:33:21 2018 +0000
@@ -179,13 +179,10 @@
                                                HttpClientImpl client,
                                                HttpRequestImpl request,
                                                Version version) {
+        // The default proxy selector may select a proxy whose  address is
+        // unresolved. We must resolve the address before connecting to it.
+        InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
         HttpConnection c = null;
-        InetSocketAddress proxy = request.proxy();
-        if (proxy != null && proxy.isUnresolved()) {
-            // The default proxy selector may select a proxy whose  address is
-            // unresolved. We must resolve the address before connecting to it.
-            proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
-        }
         boolean secure = request.secure();
         ConnectionPool pool = client.connectionPool();
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Thu Feb 22 17:33:21 2018 +0000
@@ -51,7 +51,7 @@
 
     private final Object reading = new Object();
     protected final SocketChannel chan;
-    private final FlowTube tube;
+    private final SocketTube tube; // need SocketTube to call signalClosed().
     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
     private volatile boolean connected;
     private boolean closed;
@@ -84,7 +84,8 @@
                 boolean finished = chan.finishConnect();
                 assert finished : "Expected channel to be connected";
                 debug.log(Level.DEBUG,
-                          "ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress());
+                          "ConnectEvent: connect finished: %s Local addr: %s",
+                          finished, chan.getLocalAddress());
                 connected = true;
                 // complete async since the event runs on the SelectorManager thread
                 cf.completeAsync(() -> null, client().theExecutor());
@@ -107,7 +108,8 @@
             assert !connected : "Already connected";
             assert !chan.isBlocking() : "Unexpected blocking channel";
             boolean finished = false;
-            PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
+            PrivilegedExceptionAction<Boolean> pa =
+                    () -> chan.connect(Utils.resolveAddress(address));
             try {
                  finished = AccessController.doPrivileged(pa);
             } catch (PrivilegedActionException e) {
@@ -179,14 +181,19 @@
      * Closes this connection
      */
     @Override
-    public synchronized void close() {
-        if (closed) {
-            return;
+    public void close() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
         }
-        closed = true;
         try {
             Log.logTrace("Closing: " + toString());
+            debug.log(Level.DEBUG, () -> "Closing channel: "
+                    + client().debugInterestOps(chan));
             chan.close();
+            tube.signalClosed();
         } catch (IOException e) {
             Log.logTrace("Closing resulted in " + e);
         }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Feb 22 17:33:21 2018 +0000
@@ -154,6 +154,14 @@
     //                           Events                                      //
     // ======================================================================//
 
+    void signalClosed() {
+        // Ensure that the subscriber will be terminated
+        // and that future subscribers will be notified
+        // when the connection is closed.
+        readPublisher.subscriptionImpl.signalError(
+                new IOException("connection closed locally"));
+    }
+
     /**
      * A restartable task used to process tasks in sequence.
      */
@@ -498,6 +506,7 @@
         }
 
         void signalError(Throwable error) {
+            debug.log(Level.DEBUG, () -> "error signalled " + error);
             if (!errorRef.compareAndSet(null, error)) {
                 return;
             }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu Feb 22 17:33:21 2018 +0000
@@ -230,16 +230,21 @@
                                        boolean returnConnectionToPool,
                                        Executor executor)
     {
-        Log.logTrace("Reading body on stream {0}", streamid);
-        BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
+        try {
+            Log.logTrace("Reading body on stream {0}", streamid);
+            BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+            CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 
-        PushGroup<?> pg = exchange.getPushGroup();
-        if (pg != null) {
-            // if an error occurs make sure it is recorded in the PushGroup
-            cf = cf.whenComplete((t,e) -> pg.pushError(e));
+            PushGroup<?> pg = exchange.getPushGroup();
+            if (pg != null) {
+                // if an error occurs make sure it is recorded in the PushGroup
+                cf = cf.whenComplete((t, e) -> pg.pushError(e));
+            }
+            return cf;
+        } catch (Throwable t) {
+            // may be thrown by handler.apply
+            return MinimalFuture.failedFuture(t);
         }
-        return cf;
     }
 
     @Override
@@ -268,12 +273,16 @@
         // We want to allow the subscriber's getBody() method to block so it
         // can work with InputStreams. So, we offload execution.
         executor.execute(() -> {
-            bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
-                if (t == null)
-                    responseBodyCF.complete(body);
-                else
-                    responseBodyCF.completeExceptionally(t);
-            });
+            try {
+                bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+                    if (t == null)
+                        responseBodyCF.complete(body);
+                    else
+                        responseBodyCF.completeExceptionally(t);
+                });
+            } catch(Throwable t) {
+                cancelImpl(t);
+            }
         });
 
         if (isCanceled()) {
@@ -281,11 +290,11 @@
             responseBodyCF.completeExceptionally(t);
         } else {
             bodySubscriber.onSubscribe(userSubscription);
+            // Set the responseSubscriber field now that onSubscribe has been called.
+            // This effectively allows the scheduler to start invoking the callbacks.
+            responseSubscriber = bodySubscriber;
+            sched.runOrSchedule(); // in case data waiting already to be processed
         }
-        // Set the responseSubscriber field now that onSubscribe has been called.
-        // This effectively allows the scheduler to start invoking the callbacks.
-        responseSubscriber = bodySubscriber;
-        sched.runOrSchedule(); // in case data waiting already to be processed
         return responseBodyCF;
     }
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Thu Feb 22 17:33:21 2018 +0000
@@ -805,4 +805,13 @@
     public static boolean isHostnameVerificationDisabled() {
         return isHostnameVerificationDisabled;
     }
+
+    public static InetSocketAddress resolveAddress(InetSocketAddress address) {
+        if (address != null && address.isUnresolved()) {
+            // The default proxy selector may select a proxy whose  address is
+            // unresolved. We must resolve the address before connecting to it.
+            address = new InetSocketAddress(address.getHostString(), address.getPort());
+        }
+        return address;
+    }
 }
--- a/test/jdk/java/net/httpclient/DigestEchoClient.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/DigestEchoClient.java	Thu Feb 22 17:33:21 2018 +0000
@@ -501,10 +501,11 @@
                     boolean async, boolean expectContinue)
             throws Exception
     {
-        out.println(format("*** testDigest: client: %s, server: %s, async: %s, useSSL: %s, " +
-                        "authScheme: %s, authType: %s, expectContinue: %s  ***",
-                clientVersion, serverVersion, async, useSSL,
-                authScheme, authType, expectContinue));
+        String test = format("testDigest: client: %s, server: %s, async: %s, useSSL: %s, " +
+                             "authScheme: %s, authType: %s, expectContinue: %s",
+                              clientVersion, serverVersion, async, useSSL,
+                              authScheme, authType, expectContinue);
+        out.println("*** " + test + " ***");
         DigestEchoServer server = EchoServers.of(serverVersion,
                 useSSL ? "https" : "http", authType, authScheme);
 
@@ -554,7 +555,8 @@
                     resp = client.send(request, asLines());
                 }
                 System.out.println(resp);
-                assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407;
+                assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407
+                        : "challenge=" + challenge + ", resp=" + resp + ", test=[" + test + "]";
                 if (resp.statusCode() == 401 || resp.statusCode() == 407) {
                     // This assert may need to be relaxed if our server happened to
                     // decide to close the tunnel connection, in which case we would
--- a/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java	Thu Feb 22 17:33:21 2018 +0000
@@ -192,8 +192,7 @@
             HttpResponse<String> response = client.send(request, asString(UTF_8));
             fail("Unexpected response: " + response);
         } catch (IOException expected) {
-            assertTrue(expected.getMessage().contains("Too few bytes returned"),
-                       "Exception message:[" + expected.toString() + "]");
+            assertMessage(expected, "Too few bytes returned");
         }
     }
 
@@ -210,8 +209,14 @@
             HttpResponse<String> response = client.send(request, asString(UTF_8));
             fail("Unexpected response: " + response);
         } catch (IOException expected) {
-            assertTrue(expected.getMessage().contains("Too many bytes in request body"),
-                    "Exception message:[" + expected.toString() + "]");
+            assertMessage(expected, "Too many bytes in request body");
+        }
+    }
+
+    private void assertMessage(Throwable t, String contains) {
+        if (!t.getMessage().contains(contains)) {
+            String error = "Exception message:[" + t.toString() + "] doesn't contain [" + contains + "]";
+            throw new AssertionError(error, t);
         }
     }
 
--- a/test/jdk/java/net/httpclient/HttpServerAdapters.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java	Thu Feb 22 17:33:21 2018 +0000
@@ -68,6 +68,9 @@
  */
 public interface HttpServerAdapters {
 
+    static final boolean PRINTSTACK =
+            Boolean.getBoolean("jdk.internal.httpclient.debug");
+
     static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
         try {
             baos.write(ba);
@@ -231,6 +234,7 @@
                 return this.getClass().getSimpleName() + ": " + exchange.toString();
             }
         }
+
         private static final class Http2TestExchangeImpl extends HttpTestExchange {
             private final Http2TestExchange exchange;
             Http2TestExchangeImpl(Http2TestExchange exch) {
@@ -287,13 +291,24 @@
         void handle(HttpTestExchange t) throws IOException;
 
         default HttpHandler toHttpHandler() {
-            return (t) -> handle(HttpTestExchange.of(t));
+            return (t) -> doHandle(HttpTestExchange.of(t));
         }
         default Http2Handler toHttp2Handler() {
-            return (t) -> handle(HttpTestExchange.of(t));
+            return (t) -> doHandle(HttpTestExchange.of(t));
+        }
+        private void doHandle(HttpTestExchange t) throws IOException {
+            try {
+                handle(t);
+            } catch (Throwable x) {
+                System.out.println("WARNING: exception caught in HttpTestHandler::handle " + x);
+                System.err.println("WARNING: exception caught in HttpTestHandler::handle " + x);
+                if (PRINTSTACK) x.printStackTrace(System.out);
+                throw x;
+            }
         }
     }
 
+
     public static class HttpTestEchoHandler implements HttpTestHandler {
         @Override
         public void handle(HttpTestExchange t) throws IOException {
@@ -332,8 +347,15 @@
                 this.chain = chain;
             }
             @Override
-            public void doFilter(HttpTestExchange exchange) throws IOException{
-                exchange.doFilter(chain);
+            public void doFilter(HttpTestExchange exchange) throws IOException {
+                try {
+                    exchange.doFilter(chain);
+                } catch (Throwable t) {
+                    System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t);
+                    System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t);
+                    if (PRINTSTACK) t.printStackTrace();
+                    throw t;
+                }
             }
         }
 
@@ -346,10 +368,17 @@
             }
             @Override
             public void doFilter(HttpTestExchange exchange) throws IOException {
-                if (iter.hasNext()) {
-                    iter.next().doFilter(exchange, this);
-                } else {
-                    handler.handle(exchange);
+                try {
+                    if (iter.hasNext()) {
+                        iter.next().doFilter(exchange, this);
+                    } else {
+                        handler.handle(exchange);
+                    }
+                } catch (Throwable t) {
+                    System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t);
+                    System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t);
+                    if (PRINTSTACK) t.printStackTrace();
+                    throw t;
                 }
             }
         }
@@ -420,6 +449,8 @@
             public void stop() { impl.stop(0); }
             @Override
             public HttpTestContext addHandler(HttpTestHandler handler, String path) {
+                System.out.println("Http1TestServer[" + getAddress()
+                        + "]::addHandler " + handler + ", " + path);
                 return new Http1TestContext(impl.createContext(path, handler.toHttpHandler()));
             }
             @Override
@@ -466,7 +497,8 @@
             }
             @Override
             public HttpTestContext addHandler(HttpTestHandler handler, String path) {
-                System.out.println("Http2TestServerImpl::addHandler " + handler + ", " + path);
+                System.out.println("Http2TestServerImpl[" + getAddress()
+                                   + "]::addHandler " + handler + ", " + path);
                 Http2TestContext context = new Http2TestContext(handler, path);
                 impl.addHandler(context.toHttp2Handler(), path);
                 return context;
--- a/test/jdk/java/net/httpclient/ShortRequestBody.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/ShortRequestBody.java	Thu Feb 22 17:33:21 2018 +0000
@@ -168,7 +168,8 @@
 
         HttpResponse<Void> resp = cf.get(30, TimeUnit.SECONDS);
         err.println("Response code: " + resp.statusCode());
-        check(resp.statusCode() == 200, "Expected 200, got ", resp.statusCode());
+        check(resp.statusCode() == 200, null,
+                "Expected 200, got ", resp.statusCode());
     }
 
     static void failureNonBlocking(Supplier<HttpClient> clientSupplier,
@@ -190,11 +191,11 @@
         } catch (ExecutionException expected) {
             err.println("Caught expected: " + expected);
             Throwable t = expected.getCause();
-            check(t instanceof IOException,
-                  "Expected cause IOException, but got: ", expected.getCause());
+            check(t instanceof IOException, t,
+                  "Expected cause IOException, but got: ", t);
             String msg = t.getMessage();
             check(msg.contains("Too many") || msg.contains("Too few"),
-                    "Expected Too many|Too few, got: ", t);
+                    t, "Expected Too many|Too few, got: ", t);
         }
     }
 
@@ -215,7 +216,7 @@
             err.println("Caught expected: " + expected);
             String msg = expected.getMessage();
             check(msg.contains("Too many") || msg.contains("Too few"),
-                    "Expected Too many|Too few, got: ", expected);
+                    expected,"Expected Too many|Too few, got: ", expected);
         }
     }
 
@@ -318,13 +319,13 @@
         catch (IOException x) { throw new UncheckedIOException(x); }
     }
 
-    static boolean check(boolean cond, Object... failedArgs) {
+    static boolean check(boolean cond, Throwable t, Object... failedArgs) {
         if (cond)
             return true;
         // We are going to fail...
         StringBuilder sb = new StringBuilder();
         for (Object o : failedArgs)
                 sb.append(o);
-        throw new RuntimeException(sb.toString());
+        throw new RuntimeException(sb.toString(), t);
     }
 }
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Thu Feb 22 17:33:21 2018 +0000
@@ -31,7 +31,7 @@
  *          java.net.http/jdk.internal.net.http.common
  *          java.net.http/jdk.internal.net.http.frame
  *          java.net.http/jdk.internal.net.http.hpack
- * @run testng/othervm ThrowingSubscribers
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true ThrowingSubscribers
  */
 
 import com.sun.net.httpserver.HttpServer;
@@ -62,6 +62,7 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -97,11 +98,23 @@
     String https2URI_fixed;
     String https2URI_chunk;
 
-    static final int ITERATION_COUNT = 2;
+    static final int ITERATION_COUNT = 1;
     // a shared executor helps reduce the amount of threads created by the test
     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
     static volatile boolean tasksFailed;
+    static final AtomicLong serverCount = new AtomicLong();
+    static final AtomicLong clientCount = new AtomicLong();
+    static final long start = System.nanoTime();
+    public static String now() {
+        long now = System.nanoTime() - start;
+        long secs = now / 1000_000_000;
+        long mill = (now % 1000_000_000) / 1000_000;
+        long nan = now % 1000_000;
+        return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
+    }
+
+    private volatile HttpClient sharedClient;
 
     static class TestExecutor implements Executor {
         final AtomicLong tasks = new AtomicLong();
@@ -118,8 +131,8 @@
                     command.run();
                 } catch (Throwable t) {
                     tasksFailed = true;
-                    System.out.printf("Task %s failed: %s%n", id, t);
-                    System.err.printf("Task %s failed: %s%n", id, t);
+                    System.out.printf(now() + "Task %s failed: %s%n", id, t);
+                    System.err.printf(now() + "Task %s failed: %s%n", id, t);
                     FAILURES.putIfAbsent("Task " + id, t);
                     throw t;
                 }
@@ -129,14 +142,21 @@
 
     @AfterClass
     static final void printFailedTests() {
-        if (FAILURES.isEmpty()) return;
-        out.println("Failed tests: ");
-        FAILURES.entrySet().forEach((e) -> {
+        out.println("\n=========================");
+        try {
+            out.printf("%n%sCreated %d servers and %d clients%n",
+                    now(), serverCount.get(), clientCount.get());
+            if (FAILURES.isEmpty()) return;
+            out.println("Failed tests: ");
+            FAILURES.entrySet().forEach((e) -> {
                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
                 e.getValue().printStackTrace();
-        });
-        if (tasksFailed) {
-            throw new RuntimeException("Some tasks failed");
+            });
+            if (tasksFailed) {
+                System.out.println("WARNING: Some tasks failed");
+            }
+        } finally {
+            out.println("\n=========================\n");
         }
     }
 
@@ -157,13 +177,16 @@
     public Object[][] noThrows() {
         String[] uris = uris();
         Object[][] result = new Object[uris.length * 2][];
+        //Object[][] result = new Object[uris.length][];
         int i = 0;
         for (boolean sameClient : List.of(false, true)) {
+            //if (!sameClient) continue;
             for (String uri: uris()) {
                 result[i++] = new Object[] {uri, sameClient};
             }
         }
         assert i == uris.length * 2;
+        // assert i == uris.length ;
         return result;
     }
 
@@ -171,34 +194,53 @@
     public Object[][] variants() {
         String[] uris = uris();
         Object[][] result = new Object[uris.length * 2 * 2][];
+        //Object[][] result = new Object[(uris.length/2) * 2 * 2][];
         int i = 0;
         for (Thrower thrower : List.of(
-                new UncheckedCustomExceptionThrower(),
-                new UncheckedIOExceptionThrower())) {
+                new UncheckedIOExceptionThrower(),
+                new UncheckedCustomExceptionThrower())) {
             for (boolean sameClient : List.of(false, true)) {
                 for (String uri : uris()) {
+                    // if (uri.contains("http2") || uri.contains("https2")) continue;
+                    // if (!sameClient) continue;
                     result[i++] = new Object[]{uri, sameClient, thrower};
                 }
             }
         }
         assert i == uris.length * 2 * 2;
+        //assert Stream.of(result).filter(o -> o != null).count() == result.length;
         return result;
     }
 
-    HttpClient newHttpClient() {
+    private HttpClient makeNewClient() {
+        clientCount.incrementAndGet();
         return HttpClient.newBuilder()
-                         .executor(executor)
-                         .sslContext(sslContext)
-                         .build();
+                .executor(executor)
+                .sslContext(sslContext)
+                .build();
+    }
+
+    HttpClient newHttpClient(boolean share) {
+        if (!share) return makeNewClient();
+        HttpClient shared = sharedClient;
+        if (shared != null) return shared;
+        synchronized (this) {
+            shared = sharedClient;
+            if (shared == null) {
+                shared = sharedClient = makeNewClient();
+            }
+            return shared;
+        }
     }
 
     @Test(dataProvider = "noThrows")
     public void testNoThrows(String uri, boolean sameClient)
             throws Exception {
         HttpClient client = null;
+        out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
-                client = newHttpClient();
+                client = newHttpClient(sameClient);
 
             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
                     .build();
@@ -255,7 +297,7 @@
     {
         String test = format("testThrowingAsStringAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
-        testThrowing(uri, sameClient, BodyHandler::asString,
+        testThrowing(test, uri, sameClient, BodyHandler::asString,
                      this::shouldHaveThrown, thrower, true);
     }
 
@@ -288,7 +330,7 @@
                                     Finisher finisher, Thrower thrower, boolean async)
             throws Exception
     {
-        out.printf("%n%s%n", name);
+        out.printf("%n%s%s%n", now(), name);
         try {
             testThrowing(uri, sameClient, handlers, finisher, thrower, async);
         } catch (Error | Exception x) {
@@ -307,9 +349,8 @@
         for (Where where : Where.values()) {
             if (where == Where.ON_SUBSCRIBE) continue;
             if (where == Where.ON_ERROR) continue;
-            if (where == Where.GET_BODY) continue; // doesn't work with HTTP/2
             if (!sameClient || client == null)
-                client = newHttpClient();
+                client = newHttpClient(sameClient);
 
             HttpRequest req = HttpRequest.
                     newBuilder(URI.create(uri))
@@ -324,14 +365,14 @@
                 } catch (Error | Exception x) {
                     Throwable cause = findCause(x, thrower);
                     if (cause == null) throw x;
-                    System.out.println("Got expected exception: " + cause);
+                    System.out.println(now() + "Got expected exception: " + cause);
                 }
             } else {
                 try {
                     response = client.send(req, handler);
                 } catch (Error | Exception t) {
                     if (thrower.test(t)) {
-                        System.out.println("Got expected exception: " + t);
+                        System.out.println(now() + "Got expected exception: " + t);
                     } else throw t;
                 }
             }
@@ -341,7 +382,8 @@
         }
     }
 
-    enum Where {BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY;
+    enum Where {
+        BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;
         public Consumer<Where> select(Consumer<Where> consumer) {
             return new Consumer<Where>() {
                 @Override
@@ -362,15 +404,16 @@
         U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
     }
 
-    <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
+    final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
         throw new RuntimeException("Expected exception not thrown in " + w);
     }
 
-    List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
+    final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
         switch(w) {
             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
             case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
+            case BODY_CF: return shouldHaveThrown(w, resp, thrower);
             default: break;
         }
         List<String> result = null;
@@ -379,7 +422,7 @@
         } catch (Error | Exception x) {
             Throwable cause = findCause(x, thrower);
             if (cause != null) {
-                out.println("Got expected exception in " + w + ": " + cause);
+                out.println(now() + "Got expected exception in " + w + ": " + cause);
                 return result;
             }
             throw x;
@@ -387,7 +430,7 @@
         throw new RuntimeException("Expected exception not thrown in " + w);
     }
 
-    List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
+    final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
                                     Thrower thrower)
             throws IOException
     {
@@ -395,6 +438,7 @@
             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
             case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
+            case BODY_CF: return shouldHaveThrown(w, resp, thrower);
             default: break;
         }
         List<String> result = null;
@@ -403,10 +447,9 @@
             try {
                 result = r.lines().collect(Collectors.toList());
             } catch (Error | Exception x) {
-                Throwable cause =
-                        findCause(x, thrower);
+                Throwable cause = findCause(x, thrower);
                 if (cause != null) {
-                    out.println("Got expected exception in " + w + ": " + x);
+                    out.println(now() + "Got expected exception in " + w + ": " + cause);
                     return result;
                 }
                 throw x;
@@ -421,9 +464,10 @@
         return x;
     }
 
-    static class UncheckedCustomExceptionThrower implements Thrower {
+    static final class UncheckedCustomExceptionThrower implements Thrower {
         @Override
         public void accept(Where where) {
+            out.println(now() + "Throwing in " + where);
             throw new UncheckedCustomException(where.name());
         }
 
@@ -438,9 +482,10 @@
         }
     }
 
-    static class UncheckedIOExceptionThrower implements Thrower {
+    static final class UncheckedIOExceptionThrower implements Thrower {
         @Override
         public void accept(Where where) {
+            out.println(now() + "Throwing in " + where);
             throw new UncheckedIOException(new CustomIOException(where.name()));
         }
 
@@ -456,7 +501,7 @@
         }
     }
 
-    static class UncheckedCustomException extends RuntimeException {
+    static final class UncheckedCustomException extends RuntimeException {
         UncheckedCustomException(String message) {
             super(message);
         }
@@ -465,7 +510,7 @@
         }
     }
 
-    static class CustomIOException extends IOException {
+    static final class CustomIOException extends IOException {
         CustomIOException(String message) {
             super(message);
         }
@@ -474,7 +519,7 @@
         }
     }
 
-    static class ThrowingBodyHandler<T> implements BodyHandler<T> {
+    static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
         final Consumer<Where> throwing;
         final BodyHandler<T> bodyHandler;
         ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
@@ -489,7 +534,7 @@
         }
     }
 
-    static class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
+    static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
         private final BodySubscriber<T> subscriber;
         volatile boolean onSubscribeCalled;
         final Consumer<Where> throwing;
@@ -533,6 +578,11 @@
         @Override
         public CompletionStage<T> getBody() {
             throwing.accept(Where.GET_BODY);
+            try {
+                throwing.accept(Where.BODY_CF);
+            } catch (Throwable t) {
+                return CompletableFuture.failedFuture(t);
+            }
             return subscriber.getBody();
         }
     }
@@ -580,6 +630,7 @@
         https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed/x";
         https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk/x";
 
+        serverCount.addAndGet(4);
         httpTestServer.start();
         httpsTestServer.start();
         http2TestServer.start();
@@ -588,6 +639,7 @@
 
     @AfterTest
     public void teardown() throws Exception {
+        sharedClient = null;
         httpTestServer.stop();
         httpsTestServer.stop();
         http2TestServer.stop();