http-client-branch: fixes various test instantibilities http-client-branch
authordfuchs
Wed, 27 Jun 2018 21:24:56 +0100
branchhttp-client-branch
changeset 56812 a6180efe1d58
parent 56811 7c86bd1e9a43
child 56813 28f791bb1c2d
http-client-branch: fixes various test instantibilities
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java
test/jdk/java/net/httpclient/CancelledResponse.java
test/jdk/java/net/httpclient/MockServer.java
test/jdk/java/net/httpclient/ShortResponseBody.java
test/jdk/java/net/httpclient/SplitResponse.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Wed Jun 27 21:24:56 2018 +0100
@@ -386,8 +386,11 @@
             // we have a flow List<ByteBuffer> upstream.
             Http1AsyncDelegateSubscription subscription =
                     new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
-            pending.onSubscribe(subscription);
-            this.delegate = delegate = pending;
+            try {
+                pending.onSubscribe(subscription);
+            } finally {
+                this.delegate = delegate = pending;
+            }
             final Object captured = delegate;
             if (debug.on())
                 debug.log("delegate is now " + captured
@@ -485,10 +488,11 @@
                 error = ex;
             }
         }
-            final Throwable t = (recorded == null ? ex : recorded);
-            if (debug.on())
-                debug.log("recorded " + t + "\n\t delegate: " + delegate
-                          + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+
+        final Throwable t = (recorded == null ? ex : recorded);
+        if (debug.on())
+            debug.log("recorded " + t + "\n\t delegate: " + delegate
+                      + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
         if (Log.errors()) {
             Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
         }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Wed Jun 27 21:24:56 2018 +0100
@@ -257,6 +257,14 @@
                 .thenCompose(unused -> {
                     CompletableFuture<Void> cf = new MinimalFuture<>();
                     try {
+                        asyncReceiver.whenFinished.whenComplete((r,t) -> {
+                            if (t != null) {
+                                if (debug.on())
+                                    debug.log("asyncReceiver finished (failed=%s)", (Object)t);
+                                if (!headersSentCF.isDone())
+                                    headersSentCF.completeAsync(() -> this, executor);
+                            }
+                        });
                         connectFlows(connection);
 
                         if (debug.on()) debug.log("requestAction.headers");
@@ -282,7 +290,8 @@
 
     private void cancelIfFailed(Flow.Subscription s) {
         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
-            if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+            if (debug.on())
+                debug.log("asyncReceiver finished (failed=%s)", (Object)t);
             if (t != null) {
                 s.cancel();
                 // Don't complete exceptionally here as 't'
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Wed Jun 27 21:24:56 2018 +0100
@@ -742,6 +742,9 @@
                 }
 
                 if (!(frame instanceof ResetFrame)) {
+                    if (frame instanceof DataFrame) {
+                        dropDataFrame((DataFrame)frame);
+                    }
                     if (isServerInitiatedStream(streamid)) {
                         if (streamid < nextPushStream) {
                             // trailing data on a cancelled push promise stream,
@@ -780,6 +783,27 @@
         }
     }
 
+    final void dropDataFrame(DataFrame df) {
+        if (closed) return;
+        if (debug.on()) {
+            debug.log("Dropping data frame for stream %d (%d payload bytes)",
+                    df.streamid(), df.payloadLength());
+        }
+        ensureWindowUpdated(df);
+    }
+
+    final void ensureWindowUpdated(DataFrame df) {
+        try {
+            if (closed) return;
+            int length = df.payloadLength();
+            if (length > 0) {
+                windowUpdater.update(length);
+            }
+        } catch(Throwable t) {
+            Log.logError("Unexpected exception while updating window: {0}", (Object)t);
+        }
+    }
+
     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
         throws IOException
     {
@@ -988,7 +1012,6 @@
                      connection.channel().getLocalAddress(),
                      connection.address());
         SettingsFrame sf = new SettingsFrame(clientSettings);
-        int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
         Log.logFrames(sf, "OUT");
         // send preface bytes and SettingsFrame together
@@ -1001,9 +1024,20 @@
         Log.logTrace("Settings Frame sent");
 
         // send a Window update for the receive buffer we are using
-        // minus the initial 64 K specified in protocol
-        final int len = windowUpdater.initialWindowSize - initialWindowSize;
-        if (len > 0) {
+        // minus the initial 64 K -1 specified in protocol:
+        // RFC 7540, Section 6.9.2:
+        // "[...] the connection flow-control window is set to the default
+        // initial window size until a WINDOW_UPDATE frame is received."
+        //
+        // Note that the default initial window size, not to be confused
+        // with the initial window size, is defined by RFC 7540 as
+        // 64K -1.
+        final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
+        if (len != 0) {
+            if (Log.channel()) {
+                Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
+                        len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
+            }
             windowUpdater.sendWindowUpdate(len);
         }
         // there will be an ACK to the windows update - which should
@@ -1136,6 +1170,7 @@
 
     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
         Stream<?> stream = oh.getAttachment();
+        assert stream.streamid == 0;
         int streamid = nextstreamid;
         nextstreamid += 2;
         stream.registerStream(streamid);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Wed Jun 27 21:24:56 2018 +0100
@@ -185,6 +185,7 @@
                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
                 if (size == 0 && finished) {
                     inputQ.remove();
+                    connection.ensureWindowUpdated(df); // must update connection window
                     Log.logTrace("responseSubscriber.onComplete");
                     if (debug.on()) debug.log("incoming: onComplete");
                     sched.stop();
@@ -197,7 +198,12 @@
                     inputQ.remove();
                     Log.logTrace("responseSubscriber.onNext {0}", size);
                     if (debug.on()) debug.log("incoming: onNext(%d)", size);
-                    subscriber.onNext(dsts);
+                    try {
+                        subscriber.onNext(dsts);
+                    } catch (Throwable t) {
+                        connection.dropDataFrame(df); // must update connection window
+                        throw t;
+                    }
                     if (consumed(df)) {
                         Log.logTrace("responseSubscriber.onComplete");
                         if (debug.on()) debug.log("incoming: onComplete");
@@ -215,6 +221,8 @@
             }
         } catch (Throwable throwable) {
             errorRef.compareAndSet(null, throwable);
+        } finally {
+            if (sched.isStopped()) drainInputQueue();
         }
 
         Throwable t = errorRef.get();
@@ -223,20 +231,35 @@
             try {
                 if (!onCompleteCalled) {
                     if (debug.on())
-                        debug.log("calling subscriber.onError: %s", (Object)t);
+                        debug.log("calling subscriber.onError: %s", (Object) t);
                     subscriber.onError(t);
                 } else {
                     if (debug.on())
-                        debug.log("already completed: dropping error %s", (Object)t);
+                        debug.log("already completed: dropping error %s", (Object) t);
                 }
             } catch (Throwable x) {
-                Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
+                Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
             } finally {
                 cancelImpl(t);
+                drainInputQueue();
             }
         }
     }
 
+    // must only be called from the scheduler schedule() loop.
+    // ensure that all received data frames are accounted for
+    // in the connection window flow control if the scheduler
+    // is stopped before all the data is consumed.
+    private void drainInputQueue() {
+        Http2Frame frame;
+        while ((frame = inputQ.poll()) != null) {
+            if (frame instanceof DataFrame) {
+                connection.dropDataFrame((DataFrame)frame);
+            }
+        }
+    }
+
+
     // Callback invoked after the Response BodySubscriber has consumed the
     // buffers contained in a DataFrame.
     // Returns true if END_STREAM is reached, false otherwise.
@@ -245,15 +268,19 @@
         // The entire DATA frame payload is included in flow control,
         // including the Pad Length and Padding fields if present
         int len = df.payloadLength();
+        boolean endStream = df.getFlag(DataFrame.END_STREAM);
+        if (len == 0) return endStream;
+
         connection.windowUpdater.update(len);
 
-        if (!df.getFlag(DataFrame.END_STREAM)) {
+        if (!endStream) {
             // Don't send window update on a stream which is
             // closed or half closed.
             windowUpdater.update(len);
-            return false; // more data coming
         }
-        return true; // end of stream
+
+        // true: end of stream; false: more data coming
+        return endStream;
     }
 
     boolean deRegister() {
@@ -1126,7 +1153,7 @@
                     connection.resetStream(streamid, ResetFrame.CANCEL);
                 }
             }
-        } catch (IOException ex) {
+        } catch (Throwable ex) {
             Log.logError(ex);
         }
     }
@@ -1289,6 +1316,18 @@
         int getStreamId() {
             return streamid;
         }
+
+        @Override
+        String dbgString() {
+            String dbg = dbgString;
+            if (dbg != null) return dbg;
+            if (streamid == 0) {
+                return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
+            } else {
+                dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
+                return dbgString = dbg;
+            }
+        }
     }
 
     /**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Wed Jun 27 21:24:56 2018 +0100
@@ -25,6 +25,7 @@
 
 package jdk.internal.net.http;
 
+import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Logger;
 import jdk.internal.net.http.frame.SettingsFrame;
 import jdk.internal.net.http.frame.WindowUpdateFrame;
@@ -84,8 +85,18 @@
         connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
     }
 
+    volatile String dbgString;
     String dbgString() {
-        return "WindowUpdateSender(stream: " + getStreamId() + ")";
+        String dbg = dbgString;
+        if (dbg != null) return dbg;
+        FlowTube tube = connection.connection.getConnectionFlow();
+        if (tube == null) {
+            return "WindowUpdateSender(stream: " + getStreamId() + ")";
+        } else {
+            int streamId = getStreamId();
+            dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamId + ")";
+            return streamId == 0 ? dbg : (dbgString = dbg);
+        }
     }
 
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jun 27 21:24:56 2018 +0100
@@ -448,6 +448,7 @@
                     } catch (IOException ex) {
                         errorCommon(ex);
                         handleError(ex);
+                        return;
                     }
                     if (handshaking && !complete)
                         return;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Wed Jun 27 21:24:56 2018 +0100
@@ -485,14 +485,22 @@
             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
         }
 
-        private boolean handshakeFailed() {
+        private String handshakeFailed() {
             // sslDelegate can be null if we reach here
             // during the initial handshake, as that happens
             // within the SSLFlowDelegate constructor.
             // In that case we will want to raise an exception.
-            return handshaking()
+            if (handshaking()
                     && (sslDelegate == null
-                    || !sslDelegate.closeNotifyReceived());
+                    || !sslDelegate.closeNotifyReceived())) {
+                return "Remote host terminated the handshake";
+            }
+            // The initial handshake may not have been started yet.
+            // In which case - if we are completed before the initial handshake
+            // is started, we consider this a handshake failure as well.
+            if ("SSL_NULL_WITH_NULL_NULL".equals(engine.getSession().getCipherSuite()))
+                return "Remote host closed the channel";
+            return null;
         }
 
         @Override
@@ -503,14 +511,15 @@
                 subscriberImpl = subscribed;
             }
 
-            if (handshakeFailed()) {
+            String handshakeFailed = handshakeFailed();
+            if (handshakeFailed != null) {
                 if (debug.on())
-                    debug.log("handshake: %s, inbound done: %s outbound done: %s",
+                    debug.log("handshake: %s, inbound done: %s, outbound done: %s: %s",
                               engine.getHandshakeStatus(),
                               engine.isInboundDone(),
-                              engine.isOutboundDone());
-                onErrorImpl(new SSLHandshakeException(
-                        "Remote host terminated the handshake"));
+                              engine.isOutboundDone(),
+                              handshakeFailed);
+                onErrorImpl(new SSLHandshakeException(handshakeFailed));
             } else if (subscriberImpl != null) {
                 onCompleteReceived = finished = true;
                 subscriberImpl.onComplete();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java	Wed Jun 27 21:24:56 2018 +0100
@@ -161,14 +161,19 @@
         }
     }
 
+    public static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * K -1;
+    public static final int DEFAULT_HEADER_TABLE_SIZE = 4 * K;
+    public static final int DEFAULT_MAX_CONCURRENT_STREAMS = 100;
+    public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
+
     public static SettingsFrame getDefaultSettings() {
         SettingsFrame f = new SettingsFrame();
         // TODO: check these values
         f.setParameter(ENABLE_PUSH, 1);
-        f.setParameter(HEADER_TABLE_SIZE, 4 * K);
-        f.setParameter(MAX_CONCURRENT_STREAMS, 100);
-        f.setParameter(INITIAL_WINDOW_SIZE, 64 * K - 1);
-        f.setParameter(MAX_FRAME_SIZE, 16 * K);
+        f.setParameter(HEADER_TABLE_SIZE, DEFAULT_HEADER_TABLE_SIZE);
+        f.setParameter(MAX_CONCURRENT_STREAMS, DEFAULT_MAX_CONCURRENT_STREAMS);
+        f.setParameter(INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
+        f.setParameter(MAX_FRAME_SIZE, DEFAULT_MAX_FRAME_SIZE);
         return f;
     }
 }
--- a/test/jdk/java/net/httpclient/CancelledResponse.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/CancelledResponse.java	Wed Jun 27 21:24:56 2018 +0100
@@ -339,9 +339,11 @@
                     Thread.sleep(10);
                 }
                 out.println("sent " + s);
-            } catch (SSLException | SocketException x) {
-                // if SSL then we might get a "Broken Pipe", otherwise
-                // a "Socket closed".
+            } catch (SSLException | SocketException | RuntimeException x) {
+                // if SSL then we might get a "Broken Pipe", or a
+                // RuntimeException wrapping an InvalidAlgorithmParameterException
+                // (probably if the channel is closed during the handshake),
+                // otherwise we get a "Socket closed".
                 boolean expected = cancelled.get();
                 if (sent > 0 && expected) {
                     System.out.println("Connection closed by peer as expected: " + x);
@@ -349,6 +351,7 @@
                 } else {
                     System.out.println("Unexpected exception (sent="
                             + sent + ", cancelled=" + expected + "): " + x);
+                    if (x instanceof RuntimeException) throw (RuntimeException) x;
                     throw new RuntimeException(x);
                 }
             } catch (IOException | InterruptedException e) {
--- a/test/jdk/java/net/httpclient/MockServer.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/MockServer.java	Wed Jun 27 21:24:56 2018 +0100
@@ -284,7 +284,7 @@
             }
             try {
                 socket.close();
-            } catch (IOException e) {}
+            } catch (Throwable e) {}
             synchronized (removals) {
                 removals.add(this);
             }
@@ -339,7 +339,7 @@
         closed = true;
         try {
             ss.close();
-        } catch (IOException e) {
+        } catch (Throwable e) {
             e.printStackTrace();
         }
         for (Connection c : sockets) {
--- a/test/jdk/java/net/httpclient/ShortResponseBody.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/ShortResponseBody.java	Wed Jun 27 21:24:56 2018 +0100
@@ -264,19 +264,47 @@
 
     // can be used to prolong request body publication
     static final class InfiniteInputStream extends InputStream {
+        int count = 0;
+        int k16 = 0;
         @Override
         public int read() throws IOException {
+            if (++count == 1) {
+                System.out.println("Start sending 1 byte");
+            }
+            if (count > 16 * 1024) {
+                k16++;
+                System.out.println("... 16K sent.");
+                count = count % (16 * 1024);
+            }
+            if (k16 > 128) {
+                System.out.println("WARNING: InfiniteInputStream: " +
+                        "more than 128 16k buffers generated: returning EOF");
+                return -1;
+            }
             return 1;
         }
 
         @Override
         public int read(byte[] buf, int offset, int length) {
             //int count = offset;
-            //length = Math.max(0, Math.min(buf.length - offset, length));
+            length = Math.max(0, Math.min(buf.length - offset, length));
             //for (; count < length; count++)
             //    buf[offset++] = 0x01;
             //return count;
-            return Math.max(0, Math.min(buf.length - offset, length));
+            if (count == 0) {
+                System.out.println("Start sending " + length);
+            } else if (count > 16 * 1024) {
+                k16++;
+                System.out.println("... 16K sent.");
+                count = count % (16 * 1024);
+            }
+            if (k16 > 128) {
+                System.out.println("WARNING: InfiniteInputStream: " +
+                        "more than 128 16k buffers generated: returning EOF");
+                return -1;
+            }
+            count += length;
+            return length;
         }
     }
 
@@ -493,10 +521,13 @@
                     out.print(requestMethod + " ");
                     URI uriPath = readRequestPath(is);
                     out.println(uriPath);
-                    readRequestHeaders(is);
+                    String headers = readRequestHeaders(is);
 
                     String query = uriPath.getRawQuery();
-                    assert query != null;
+                    if (query == null) {
+                        out.println("Request headers: [" + headers + "]");
+                    }
+                    assert query != null : "null query for uriPath: " + uriPath;
                     String qv = query.split("=")[1];
                     int len;
                     if (qv.equals("all")) {
@@ -542,9 +573,11 @@
         }
 
         // Read until the end of a HTTP request headers
-        static void readRequestHeaders(InputStream is) throws IOException {
+        static String readRequestHeaders(InputStream is) throws IOException {
             int requestEndCount = 0, r;
+            StringBuilder sb = new StringBuilder();
             while ((r = is.read()) != -1) {
+                sb.append((char) r);
                 if (r == requestEnd[requestEndCount]) {
                     requestEndCount++;
                     if (requestEndCount == 4) {
@@ -554,6 +587,7 @@
                     requestEndCount = 0;
                 }
             }
+            return sb.toString();
         }
     }
 
--- a/test/jdk/java/net/httpclient/SplitResponse.java	Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/SplitResponse.java	Wed Jun 27 21:24:56 2018 +0100
@@ -32,6 +32,7 @@
 import java.util.concurrent.CompletableFuture;
 import javax.net.ssl.SSLContext;
 import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLServerSocketFactory;
 import java.net.http.HttpClient;
 import java.net.http.HttpClient.Version;
@@ -268,7 +269,7 @@
                     String onechar = s.substring(i, i + 1);
                     try {
                         conn.send(onechar);
-                    } catch(SocketException x) {
+                    } catch(SocketException | SSLException x) {
                         if (!useSSL || i != len - 1) throw x;
                         if (x.getMessage().contains("closed by remote host")) {
                             String osname = System.getProperty("os.name", "unknown");