Merge http-client-branch
authordfuchs
Tue, 03 Jul 2018 11:23:42 +0200
branchhttp-client-branch
changeset 56818 c76f4aa16140
parent 56817 167f07a3512d (diff)
parent 50926 79baec7d831e (current diff)
child 56819 4cd8d88dab38
Merge
test/hotspot/jtreg/vmTestbase/nsk/jvmti/AttachOnDemand/attach024/java.base/java/util/ServiceConfigurationError.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Tue Jul 03 11:23:42 2018 +0200
@@ -316,7 +316,7 @@
                     proxyResponse.version, true);
             return MinimalFuture.completedFuture(syntheticResponse);
         } else if (t != null) {
-            if (debug.on()) debug.log("checkFor407: no response - %s", t);
+            if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
             return MinimalFuture.failedFuture(t);
         } else {
             if (debug.on()) debug.log("checkFor407: all clear");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Tue Jul 03 11:23:42 2018 +0200
@@ -386,8 +386,11 @@
             // we have a flow List<ByteBuffer> upstream.
             Http1AsyncDelegateSubscription subscription =
                     new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
-            pending.onSubscribe(subscription);
-            this.delegate = delegate = pending;
+            try {
+                pending.onSubscribe(subscription);
+            } finally {
+                this.delegate = delegate = pending;
+            }
             final Object captured = delegate;
             if (debug.on())
                 debug.log("delegate is now " + captured
@@ -485,10 +488,11 @@
                 error = ex;
             }
         }
-            final Throwable t = (recorded == null ? ex : recorded);
-            if (debug.on())
-                debug.log("recorded " + t + "\n\t delegate: " + delegate
-                          + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+
+        final Throwable t = (recorded == null ? ex : recorded);
+        if (debug.on())
+            debug.log("recorded " + t + "\n\t delegate: " + delegate
+                      + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
         if (Log.errors()) {
             Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
         }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue Jul 03 11:23:42 2018 +0200
@@ -257,6 +257,14 @@
                 .thenCompose(unused -> {
                     CompletableFuture<Void> cf = new MinimalFuture<>();
                     try {
+                        asyncReceiver.whenFinished.whenComplete((r,t) -> {
+                            if (t != null) {
+                                if (debug.on())
+                                    debug.log("asyncReceiver finished (failed=%s)", (Object)t);
+                                if (!headersSentCF.isDone())
+                                    headersSentCF.completeAsync(() -> this, executor);
+                            }
+                        });
                         connectFlows(connection);
 
                         if (debug.on()) debug.log("requestAction.headers");
@@ -282,7 +290,8 @@
 
     private void cancelIfFailed(Flow.Subscription s) {
         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
-            if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+            if (debug.on())
+                debug.log("asyncReceiver finished (failed=%s)", (Object)t);
             if (t != null) {
                 s.cancel();
                 // Don't complete exceptionally here as 't'
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue Jul 03 11:23:42 2018 +0200
@@ -673,7 +673,11 @@
         client2.deleteConnection(this);
         List<Stream<?>> c = new LinkedList<>(streams.values());
         for (Stream<?> s : c) {
-            s.connectionClosing(t);
+            try {
+                s.connectionClosing(t);
+            } catch (Throwable e) {
+                Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
+            }
         }
         connection.close();
     }
@@ -738,6 +742,9 @@
                 }
 
                 if (!(frame instanceof ResetFrame)) {
+                    if (frame instanceof DataFrame) {
+                        dropDataFrame((DataFrame)frame);
+                    }
                     if (isServerInitiatedStream(streamid)) {
                         if (streamid < nextPushStream) {
                             // trailing data on a cancelled push promise stream,
@@ -776,6 +783,27 @@
         }
     }
 
+    final void dropDataFrame(DataFrame df) {
+        if (closed) return;
+        if (debug.on()) {
+            debug.log("Dropping data frame for stream %d (%d payload bytes)",
+                    df.streamid(), df.payloadLength());
+        }
+        ensureWindowUpdated(df);
+    }
+
+    final void ensureWindowUpdated(DataFrame df) {
+        try {
+            if (closed) return;
+            int length = df.payloadLength();
+            if (length > 0) {
+                windowUpdater.update(length);
+            }
+        } catch(Throwable t) {
+            Log.logError("Unexpected exception while updating window: {0}", (Object)t);
+        }
+    }
+
     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
         throws IOException
     {
@@ -984,7 +1012,6 @@
                      connection.channel().getLocalAddress(),
                      connection.address());
         SettingsFrame sf = new SettingsFrame(clientSettings);
-        int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
         Log.logFrames(sf, "OUT");
         // send preface bytes and SettingsFrame together
@@ -997,9 +1024,20 @@
         Log.logTrace("Settings Frame sent");
 
         // send a Window update for the receive buffer we are using
-        // minus the initial 64 K specified in protocol
-        final int len = windowUpdater.initialWindowSize - initialWindowSize;
-        if (len > 0) {
+        // minus the initial 64 K -1 specified in protocol:
+        // RFC 7540, Section 6.9.2:
+        // "[...] the connection flow-control window is set to the default
+        // initial window size until a WINDOW_UPDATE frame is received."
+        //
+        // Note that the default initial window size, not to be confused
+        // with the initial window size, is defined by RFC 7540 as
+        // 64K -1.
+        final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
+        if (len != 0) {
+            if (Log.channel()) {
+                Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
+                        len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
+            }
             windowUpdater.sendWindowUpdate(len);
         }
         // there will be an ACK to the windows update - which should
@@ -1132,6 +1170,7 @@
 
     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
         Stream<?> stream = oh.getAttachment();
+        assert stream.streamid == 0;
         int streamid = nextstreamid;
         nextstreamid += 2;
         stream.registerStream(streamid);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Tue Jul 03 11:23:42 2018 +0200
@@ -360,7 +360,6 @@
                 }
             } catch (Throwable t) {
                 signalError(t);
-                subscription.cancel();
             }
         }
 
@@ -424,6 +423,8 @@
             }
             completed = true;
             readPublisher.signalError(error);
+            Flow.Subscription subscription = this.subscription;
+            if (subscription != null) subscription.cancel();
         }
 
         // A repeatable WriteEvent which is paused after firing and can
@@ -468,7 +469,11 @@
 
             @Override
             public void cancel() {
+                if (cancelled) return;
                 if (debug.on()) debug.log("write: cancel");
+                if (Log.channel()) {
+                    Log.logChannel("Cancelling write subscription");
+                }
                 dropSubscription();
                 upstreamSubscription.cancel();
             }
@@ -503,9 +508,7 @@
                 } catch (Throwable t) {
                     if (debug.on())
                         debug.log("write: error while requesting more: " + t);
-                    cancelled = true;
                     signalError(t);
-                    subscription.cancel();
                 } finally {
                     debugState("leaving requestMore: ");
                 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Tue Jul 03 11:23:42 2018 +0200
@@ -185,6 +185,7 @@
                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
                 if (size == 0 && finished) {
                     inputQ.remove();
+                    connection.ensureWindowUpdated(df); // must update connection window
                     Log.logTrace("responseSubscriber.onComplete");
                     if (debug.on()) debug.log("incoming: onComplete");
                     sched.stop();
@@ -197,7 +198,12 @@
                     inputQ.remove();
                     Log.logTrace("responseSubscriber.onNext {0}", size);
                     if (debug.on()) debug.log("incoming: onNext(%d)", size);
-                    subscriber.onNext(dsts);
+                    try {
+                        subscriber.onNext(dsts);
+                    } catch (Throwable t) {
+                        connection.dropDataFrame(df); // must update connection window
+                        throw t;
+                    }
                     if (consumed(df)) {
                         Log.logTrace("responseSubscriber.onComplete");
                         if (debug.on()) debug.log("incoming: onComplete");
@@ -215,6 +221,8 @@
             }
         } catch (Throwable throwable) {
             errorRef.compareAndSet(null, throwable);
+        } finally {
+            if (sched.isStopped()) drainInputQueue();
         }
 
         Throwable t = errorRef.get();
@@ -223,20 +231,35 @@
             try {
                 if (!onCompleteCalled) {
                     if (debug.on())
-                        debug.log("calling subscriber.onError: %s", (Object)t);
+                        debug.log("calling subscriber.onError: %s", (Object) t);
                     subscriber.onError(t);
                 } else {
                     if (debug.on())
-                        debug.log("already completed: dropping error %s", (Object)t);
+                        debug.log("already completed: dropping error %s", (Object) t);
                 }
             } catch (Throwable x) {
-                Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
+                Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
             } finally {
                 cancelImpl(t);
+                drainInputQueue();
             }
         }
     }
 
+    // must only be called from the scheduler schedule() loop.
+    // ensure that all received data frames are accounted for
+    // in the connection window flow control if the scheduler
+    // is stopped before all the data is consumed.
+    private void drainInputQueue() {
+        Http2Frame frame;
+        while ((frame = inputQ.poll()) != null) {
+            if (frame instanceof DataFrame) {
+                connection.dropDataFrame((DataFrame)frame);
+            }
+        }
+    }
+
+
     // Callback invoked after the Response BodySubscriber has consumed the
     // buffers contained in a DataFrame.
     // Returns true if END_STREAM is reached, false otherwise.
@@ -245,15 +268,19 @@
         // The entire DATA frame payload is included in flow control,
         // including the Pad Length and Padding fields if present
         int len = df.payloadLength();
+        boolean endStream = df.getFlag(DataFrame.END_STREAM);
+        if (len == 0) return endStream;
+
         connection.windowUpdater.update(len);
 
-        if (!df.getFlag(DataFrame.END_STREAM)) {
+        if (!endStream) {
             // Don't send window update on a stream which is
             // closed or half closed.
             windowUpdater.update(len);
-            return false; // more data coming
         }
-        return true; // end of stream
+
+        // true: end of stream; false: more data coming
+        return endStream;
     }
 
     boolean deRegister() {
@@ -500,8 +527,8 @@
     {
         int amount = frame.getUpdate();
         if (amount <= 0) {
-            Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
-                         streamid, streamid, amount);
+            Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
+                         streamid, amount);
             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
         } else {
             assert streamid != 0;
@@ -1126,7 +1153,7 @@
                     connection.resetStream(streamid, ResetFrame.CANCEL);
                 }
             }
-        } catch (IOException ex) {
+        } catch (Throwable ex) {
             Log.logError(ex);
         }
     }
@@ -1289,6 +1316,18 @@
         int getStreamId() {
             return streamid;
         }
+
+        @Override
+        String dbgString() {
+            String dbg = dbgString;
+            if (dbg != null) return dbg;
+            if (streamid == 0) {
+                return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
+            } else {
+                dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
+                return dbgString = dbg;
+            }
+        }
     }
 
     /**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Tue Jul 03 11:23:42 2018 +0200
@@ -25,6 +25,7 @@
 
 package jdk.internal.net.http;
 
+import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Logger;
 import jdk.internal.net.http.frame.SettingsFrame;
 import jdk.internal.net.http.frame.WindowUpdateFrame;
@@ -66,8 +67,9 @@
     abstract int getStreamId();
 
     void update(int delta) {
-        if (debug.on()) debug.log("update: %d", delta);
-        if (received.addAndGet(delta) > limit) {
+        int rcv = received.addAndGet(delta);
+        if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
+        if (rcv > limit) {
             synchronized (this) {
                 int tosend = received.get();
                 if( tosend > limit) {
@@ -83,8 +85,18 @@
         connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
     }
 
+    volatile String dbgString;
     String dbgString() {
-        return "WindowUpdateSender(stream: " + getStreamId() + ")";
+        String dbg = dbgString;
+        if (dbg != null) return dbg;
+        FlowTube tube = connection.connection.getConnectionFlow();
+        if (tube == null) {
+            return "WindowUpdateSender(stream: " + getStreamId() + ")";
+        } else {
+            int streamId = getStreamId();
+            dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamId + ")";
+            return streamId == 0 ? dbg : (dbgString = dbg);
+        }
     }
 
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Jul 03 11:23:42 2018 +0200
@@ -33,6 +33,9 @@
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -93,6 +96,8 @@
     // When handshake is in progress trying to wrap may produce no bytes.
     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
+    private static final boolean isMonitored =
+            monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
 
     final Executor exec;
     final Reader reader;
@@ -100,6 +105,7 @@
     final SSLEngine engine;
     final String tubeName; // hack
     final CompletableFuture<String> alpnCF; // completes on initial handshake
+    final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
@@ -152,8 +158,7 @@
         // Writer to the downWriter.
         connect(downReader, downWriter);
 
-        if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
-            Monitor.add(this::monitor);
+        if (isMonitored) Monitor.add(monitor);
     }
 
     /**
@@ -202,6 +207,7 @@
     public String monitor() {
         StringBuilder sb = new StringBuilder();
         sb.append("SSL: id ").append(id);
+        sb.append(" ").append(dbgString());
         sb.append(" HS state: " + states(handshakeState));
         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
         if (stateList != null) {
@@ -293,8 +299,10 @@
 
         @Override
         public String toString() {
-            return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
-                    + " count: " + count.toString();
+            return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
+                    + ", count: " + count.toString() + ", scheduler: "
+                    + (scheduler.isStopped() ? "stopped" : "running")
+                    + ", status: " + lastUnwrapStatus;
         }
 
         private void reallocReadBuf() {
@@ -335,6 +343,7 @@
                 }
                 if (complete) {
                     this.completing = complete;
+                    minBytesRequired = 0;
                 }
             }
         }
@@ -395,13 +404,23 @@
                             // not enough data in the read buffer...
                             // no need to try to unwrap again unless we get more bytes
                             // than minBytesRequired = len in the read buffer.
-                            minBytesRequired = len;
                             synchronized (readBufferLock) {
+                                minBytesRequired = len;
                                 // more bytes could already have been added...
                                 assert readBuf.remaining() >= len;
                                 // check if we have received some data, and if so
                                 // we can just re-spin the loop
                                 if (readBuf.remaining() > len) continue;
+                                else if (this.completing) {
+                                    if (debug.on()) {
+                                        debugr.log("BUFFER_UNDERFLOW with EOF," +
+                                                " %d bytes non decrypted.", len);
+                                    }
+                                    // The channel won't send us any more data, and
+                                    // we are in underflow: we need to fail.
+                                    throw new IOException("BUFFER_UNDERFLOW with EOF, "
+                                            + len + " bytes non decrypted.");
+                                }
                             }
                             // request more data and return.
                             requestMore();
@@ -429,6 +448,7 @@
                     } catch (IOException ex) {
                         errorCommon(ex);
                         handleError(ex);
+                        return;
                     }
                     if (handshaking && !complete)
                         return;
@@ -452,12 +472,13 @@
             }
         }
 
+        private volatile Status lastUnwrapStatus;
         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
             ByteBuffer dst = getAppBuffer();
             int len = src.remaining();
             while (true) {
                 SSLEngineResult sslResult = engine.unwrap(src, dst);
-                switch (sslResult.getStatus()) {
+                switch (lastUnwrapStatus = sslResult.getStatus()) {
                     case BUFFER_OVERFLOW:
                         // may happen if app size buffer was changed, or if
                         // our 'adaptiveBufferSize' guess was too small for
@@ -507,7 +528,9 @@
     }
 
     public static class Monitor extends Thread {
-        final List<Monitorable> list;
+        final List<WeakReference<Monitorable>> list;
+        final List<FinalMonitorable> finalList;
+        final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
         static Monitor themon;
 
         static {
@@ -515,19 +538,61 @@
             themon.start(); // uncomment to enable Monitor
         }
 
+        // An instance used to temporarily store the
+        // last observable state of a monitorable object.
+        // When Monitor.remove(o) is called, we replace
+        // 'o' with a FinalMonitorable whose reference
+        // will be enqueued after the last observable state
+        // has been printed.
+        final class FinalMonitorable implements Monitorable {
+            final String finalState;
+            FinalMonitorable(Monitorable o) {
+                finalState = o.getInfo();
+                finalList.add(this);
+            }
+            @Override
+            public String getInfo() {
+                finalList.remove(this);
+                return finalState;
+            }
+        }
+
         Monitor() {
             super("Monitor");
             setDaemon(true);
             list = Collections.synchronizedList(new LinkedList<>());
+            finalList = new ArrayList<>(); // access is synchronized on list above
         }
 
         void addTarget(Monitorable o) {
-            list.add(o);
+            list.add(new WeakReference<>(o, queue));
+        }
+        void removeTarget(Monitorable o) {
+            // It can take a long time for GC to clean up references.
+            // Calling Monitor.remove() early helps removing noise from the
+            // logs/
+            synchronized (list) {
+                Iterator<WeakReference<Monitorable>> it = list.iterator();
+                while (it.hasNext()) {
+                    Monitorable m = it.next().get();
+                    if (m == null) it.remove();
+                    if (o == m) {
+                        it.remove();
+                        break;
+                    }
+                }
+                FinalMonitorable m = new FinalMonitorable(o);
+                addTarget(m);
+                Reference.reachabilityFence(m);
+            }
         }
 
         public static void add(Monitorable o) {
             themon.addTarget(o);
         }
+        public static void remove(Monitorable o) {
+            themon.removeTarget(o);
+        }
 
         @Override
         public void run() {
@@ -536,7 +601,14 @@
                 while (true) {
                     Thread.sleep(20 * 1000);
                     synchronized (list) {
-                        for (Monitorable o : list) {
+                        Reference<? extends Monitorable> expired;
+                        while ((expired = queue.poll()) != null) list.remove(expired);
+                        for (WeakReference<Monitorable> ref : list) {
+                            Monitorable o = ref.get();
+                            if (o == null) continue;
+                            if (o instanceof FinalMonitorable) {
+                                ref.enqueue();
+                            }
                             System.out.println(o.getInfo());
                             System.out.println("-------------------------");
                         }
@@ -733,6 +805,7 @@
         // downstream. Otherwise, we send the writeBuffer downstream
         // and will allocate a new one next time.
         volatile ByteBuffer writeBuffer;
+        private volatile Status lastWrappedStatus;
         @SuppressWarnings("fallthrough")
         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
             long len = Utils.remaining(src);
@@ -747,7 +820,7 @@
             while (true) {
                 SSLEngineResult sslResult = engine.wrap(src, dst);
                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
-                switch (sslResult.getStatus()) {
+                switch (lastWrappedStatus = sslResult.getStatus()) {
                     case BUFFER_OVERFLOW:
                         // Shouldn't happen. We allocated buffer with packet size
                         // get it again if net buffer size was changed
@@ -815,8 +888,10 @@
 
         @Override
         public String toString() {
-            return "WRITER: " + super.toString() +
-                    " writeList size " + Integer.toString(writeList.size());
+            return "WRITER: " + super.toString()
+                    + ", writeList size: " + Integer.toString(writeList.size())
+                    + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
+                    + ", status: " + lastWrappedStatus;
                     //" writeList: " + writeList.toString();
         }
     }
@@ -839,6 +914,7 @@
         stopped = true;
         reader.stop();
         writer.stop();
+        if (isMonitored) Monitor.remove(monitor);
     }
 
     private Void stopOnError(Throwable currentlyUnused) {
@@ -953,6 +1029,10 @@
             case NEED_UNWRAP_AGAIN:
                 // do nothing else
                 // receiving-side data will trigger unwrap
+                if (caller == WRITER) {
+                    reader.schedule();
+                    return false;
+                }
                 break;
             default:
                 throw new InternalError("Unexpected handshake status:"
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Tue Jul 03 11:23:42 2018 +0200
@@ -406,6 +406,21 @@
             }
         }
 
+        private void complete(DelegateWrapper subscriberImpl, Throwable t) {
+            try {
+                if (t == null) subscriberImpl.onComplete();
+                else subscriberImpl.onError(t);
+                if (debug.on()) {
+                    debug.log("subscriber completed %s"
+                            + ((t == null) ? "normally" : ("with error: " + t)));
+                }
+            } finally {
+                // Error or EOF while reading:
+                // cancel write side after completing read side
+                writeSubscription.cancel();
+            }
+        }
+
         private void onNewSubscription(DelegateWrapper subscriberImpl,
                                        Flow.Subscription subscription) {
             assert subscriberImpl != null;
@@ -432,13 +447,13 @@
                 if (debug.on())
                     debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s",
                               subscriberImpl, failed);
-                subscriberImpl.onError(failed);
+                complete(subscriberImpl, failed);
             } else if (completed) {
                 if (debug.on())
                     debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
                               subscriberImpl);
                 finished = true;
-                subscriberImpl.onComplete();
+                complete(subscriberImpl, null);
             }
         }
 
@@ -463,7 +478,7 @@
                 subscriberImpl = subscribed;
             }
             if (subscriberImpl != null) {
-                subscriberImpl.onError(failed);
+                complete(subscriberImpl, failed);
             } else {
                 if (debug.on())
                     debug.log("%s: delegate null, stored %s", this, failed);
@@ -485,14 +500,22 @@
             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
         }
 
-        private boolean handshakeFailed() {
+        private String handshakeFailed() {
             // sslDelegate can be null if we reach here
             // during the initial handshake, as that happens
             // within the SSLFlowDelegate constructor.
             // In that case we will want to raise an exception.
-            return handshaking()
+            if (handshaking()
                     && (sslDelegate == null
-                    || !sslDelegate.closeNotifyReceived());
+                    || !sslDelegate.closeNotifyReceived())) {
+                return "Remote host terminated the handshake";
+            }
+            // The initial handshake may not have been started yet.
+            // In which case - if we are completed before the initial handshake
+            // is started, we consider this a handshake failure as well.
+            if ("SSL_NULL_WITH_NULL_NULL".equals(engine.getSession().getCipherSuite()))
+                return "Remote host closed the channel";
+            return null;
         }
 
         @Override
@@ -503,17 +526,18 @@
                 subscriberImpl = subscribed;
             }
 
-            if (handshakeFailed()) {
+            String handshakeFailed = handshakeFailed();
+            if (handshakeFailed != null) {
                 if (debug.on())
-                    debug.log("handshake: %s, inbound done: %s outbound done: %s",
+                    debug.log("handshake: %s, inbound done: %s, outbound done: %s: %s",
                               engine.getHandshakeStatus(),
                               engine.isInboundDone(),
-                              engine.isOutboundDone());
-                onErrorImpl(new SSLHandshakeException(
-                        "Remote host terminated the handshake"));
+                              engine.isOutboundDone(),
+                              handshakeFailed);
+                onErrorImpl(new SSLHandshakeException(handshakeFailed));
             } else if (subscriberImpl != null) {
                 onCompleteReceived = finished = true;
-                subscriberImpl.onComplete();
+                complete(subscriberImpl, null);
             } else {
                 onCompleteReceived = true;
             }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java	Tue Jul 03 11:23:42 2018 +0200
@@ -161,14 +161,19 @@
         }
     }
 
+    public static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * K -1;
+    public static final int DEFAULT_HEADER_TABLE_SIZE = 4 * K;
+    public static final int DEFAULT_MAX_CONCURRENT_STREAMS = 100;
+    public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
+
     public static SettingsFrame getDefaultSettings() {
         SettingsFrame f = new SettingsFrame();
         // TODO: check these values
         f.setParameter(ENABLE_PUSH, 1);
-        f.setParameter(HEADER_TABLE_SIZE, 4 * K);
-        f.setParameter(MAX_CONCURRENT_STREAMS, 100);
-        f.setParameter(INITIAL_WINDOW_SIZE, 64 * K - 1);
-        f.setParameter(MAX_FRAME_SIZE, 16 * K);
+        f.setParameter(HEADER_TABLE_SIZE, DEFAULT_HEADER_TABLE_SIZE);
+        f.setParameter(MAX_CONCURRENT_STREAMS, DEFAULT_MAX_CONCURRENT_STREAMS);
+        f.setParameter(INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
+        f.setParameter(MAX_FRAME_SIZE, DEFAULT_MAX_FRAME_SIZE);
         return f;
     }
 }
--- a/test/jdk/java/net/httpclient/CancelledResponse.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/test/jdk/java/net/httpclient/CancelledResponse.java	Tue Jul 03 11:23:42 2018 +0200
@@ -339,9 +339,11 @@
                     Thread.sleep(10);
                 }
                 out.println("sent " + s);
-            } catch (SSLException | SocketException x) {
-                // if SSL then we might get a "Broken Pipe", otherwise
-                // a "Socket closed".
+            } catch (SSLException | SocketException | RuntimeException x) {
+                // if SSL then we might get a "Broken Pipe", or a
+                // RuntimeException wrapping an InvalidAlgorithmParameterException
+                // (probably if the channel is closed during the handshake),
+                // otherwise we get a "Socket closed".
                 boolean expected = cancelled.get();
                 if (sent > 0 && expected) {
                     System.out.println("Connection closed by peer as expected: " + x);
@@ -349,6 +351,7 @@
                 } else {
                     System.out.println("Unexpected exception (sent="
                             + sent + ", cancelled=" + expected + "): " + x);
+                    if (x instanceof RuntimeException) throw (RuntimeException) x;
                     throw new RuntimeException(x);
                 }
             } catch (IOException | InterruptedException e) {
--- a/test/jdk/java/net/httpclient/MockServer.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/test/jdk/java/net/httpclient/MockServer.java	Tue Jul 03 11:23:42 2018 +0200
@@ -284,7 +284,7 @@
             }
             try {
                 socket.close();
-            } catch (IOException e) {}
+            } catch (Throwable e) {}
             synchronized (removals) {
                 removals.add(this);
             }
@@ -339,7 +339,7 @@
         closed = true;
         try {
             ss.close();
-        } catch (IOException e) {
+        } catch (Throwable e) {
             e.printStackTrace();
         }
         for (Connection c : sockets) {
--- a/test/jdk/java/net/httpclient/ShortResponseBody.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/test/jdk/java/net/httpclient/ShortResponseBody.java	Tue Jul 03 11:23:42 2018 +0200
@@ -264,19 +264,47 @@
 
     // can be used to prolong request body publication
     static final class InfiniteInputStream extends InputStream {
+        int count = 0;
+        int k16 = 0;
         @Override
         public int read() throws IOException {
+            if (++count == 1) {
+                System.out.println("Start sending 1 byte");
+            }
+            if (count > 16 * 1024) {
+                k16++;
+                System.out.println("... 16K sent.");
+                count = count % (16 * 1024);
+            }
+            if (k16 > 128) {
+                System.out.println("WARNING: InfiniteInputStream: " +
+                        "more than 128 16k buffers generated: returning EOF");
+                return -1;
+            }
             return 1;
         }
 
         @Override
         public int read(byte[] buf, int offset, int length) {
             //int count = offset;
-            //length = Math.max(0, Math.min(buf.length - offset, length));
+            length = Math.max(0, Math.min(buf.length - offset, length));
             //for (; count < length; count++)
             //    buf[offset++] = 0x01;
             //return count;
-            return Math.max(0, Math.min(buf.length - offset, length));
+            if (count == 0) {
+                System.out.println("Start sending " + length);
+            } else if (count > 16 * 1024) {
+                k16++;
+                System.out.println("... 16K sent.");
+                count = count % (16 * 1024);
+            }
+            if (k16 > 128) {
+                System.out.println("WARNING: InfiniteInputStream: " +
+                        "more than 128 16k buffers generated: returning EOF");
+                return -1;
+            }
+            count += length;
+            return length;
         }
     }
 
@@ -493,10 +521,13 @@
                     out.print(requestMethod + " ");
                     URI uriPath = readRequestPath(is);
                     out.println(uriPath);
-                    readRequestHeaders(is);
+                    String headers = readRequestHeaders(is);
 
                     String query = uriPath.getRawQuery();
-                    assert query != null;
+                    if (query == null) {
+                        out.println("Request headers: [" + headers + "]");
+                    }
+                    assert query != null : "null query for uriPath: " + uriPath;
                     String qv = query.split("=")[1];
                     int len;
                     if (qv.equals("all")) {
@@ -542,9 +573,11 @@
         }
 
         // Read until the end of a HTTP request headers
-        static void readRequestHeaders(InputStream is) throws IOException {
+        static String readRequestHeaders(InputStream is) throws IOException {
             int requestEndCount = 0, r;
+            StringBuilder sb = new StringBuilder();
             while ((r = is.read()) != -1) {
+                sb.append((char) r);
                 if (r == requestEnd[requestEndCount]) {
                     requestEndCount++;
                     if (requestEndCount == 4) {
@@ -554,6 +587,7 @@
                     requestEndCount = 0;
                 }
             }
+            return sb.toString();
         }
     }
 
--- a/test/jdk/java/net/httpclient/SplitResponse.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/test/jdk/java/net/httpclient/SplitResponse.java	Tue Jul 03 11:23:42 2018 +0200
@@ -32,6 +32,7 @@
 import java.util.concurrent.CompletableFuture;
 import javax.net.ssl.SSLContext;
 import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLServerSocketFactory;
 import java.net.http.HttpClient;
 import java.net.http.HttpClient.Version;
@@ -268,7 +269,7 @@
                     String onechar = s.substring(i, i + 1);
                     try {
                         conn.send(onechar);
-                    } catch(SocketException x) {
+                    } catch(SocketException | SSLException x) {
                         if (!useSSL || i != len - 1) throw x;
                         if (x.getMessage().contains("closed by remote host")) {
                             String osname = System.getProperty("os.name", "unknown");
--- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/SSLEchoTubeTest.java	Tue Jul 03 10:47:50 2018 +0200
+++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/SSLEchoTubeTest.java	Tue Jul 03 11:23:42 2018 +0200
@@ -350,7 +350,7 @@
 
             @Override
             public void cancel() {
-                cancelled.set(true);
+                queue.add(EOF);
             }
         }