http-client-branch: fixed race condition in SSLFD when EOF is received while in BUFFER_UNDERFLOW; Improved debug traces http-client-branch
authordfuchs
Fri, 22 Jun 2018 19:40:37 +0100
branchhttp-client-branch
changeset 56803 858f7e57b3a3
parent 56795 03ece2518428
child 56810 f05220c47e02
http-client-branch: fixed race condition in SSLFD when EOF is received while in BUFFER_UNDERFLOW; Improved debug traces
src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Wed Jun 20 18:23:56 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Fri Jun 22 19:40:37 2018 +0100
@@ -316,7 +316,7 @@
                     proxyResponse.version, true);
             return MinimalFuture.completedFuture(syntheticResponse);
         } else if (t != null) {
-            if (debug.on()) debug.log("checkFor407: no response - %s", t);
+            if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
             return MinimalFuture.failedFuture(t);
         } else {
             if (debug.on()) debug.log("checkFor407: all clear");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Wed Jun 20 18:23:56 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Fri Jun 22 19:40:37 2018 +0100
@@ -673,7 +673,11 @@
         client2.deleteConnection(this);
         List<Stream<?>> c = new LinkedList<>(streams.values());
         for (Stream<?> s : c) {
-            s.connectionClosing(t);
+            try {
+                s.connectionClosing(t);
+            } catch (Throwable e) {
+                Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
+            }
         }
         connection.close();
     }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Wed Jun 20 18:23:56 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri Jun 22 19:40:37 2018 +0100
@@ -500,8 +500,8 @@
     {
         int amount = frame.getUpdate();
         if (amount <= 0) {
-            Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
-                         streamid, streamid, amount);
+            Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
+                         streamid, amount);
             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
         } else {
             assert streamid != 0;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Wed Jun 20 18:23:56 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java	Fri Jun 22 19:40:37 2018 +0100
@@ -66,8 +66,9 @@
     abstract int getStreamId();
 
     void update(int delta) {
-        if (debug.on()) debug.log("update: %d", delta);
-        if (received.addAndGet(delta) > limit) {
+        int rcv = received.addAndGet(delta);
+        if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
+        if (rcv > limit) {
             synchronized (this) {
                 int tosend = received.get();
                 if( tosend > limit) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jun 20 18:23:56 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Fri Jun 22 19:40:37 2018 +0100
@@ -33,6 +33,9 @@
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -93,6 +96,8 @@
     // When handshake is in progress trying to wrap may produce no bytes.
     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
+    private static final boolean isMonitored =
+            monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
 
     final Executor exec;
     final Reader reader;
@@ -100,6 +105,7 @@
     final SSLEngine engine;
     final String tubeName; // hack
     final CompletableFuture<String> alpnCF; // completes on initial handshake
+    final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
@@ -152,8 +158,7 @@
         // Writer to the downWriter.
         connect(downReader, downWriter);
 
-        if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
-            Monitor.add(this::monitor);
+        if (isMonitored) Monitor.add(monitor);
     }
 
     /**
@@ -202,6 +207,7 @@
     public String monitor() {
         StringBuilder sb = new StringBuilder();
         sb.append("SSL: id ").append(id);
+        sb.append(" ").append(dbgString());
         sb.append(" HS state: " + states(handshakeState));
         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
         if (stateList != null) {
@@ -293,8 +299,10 @@
 
         @Override
         public String toString() {
-            return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
-                    + " count: " + count.toString();
+            return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
+                    + ", count: " + count.toString() + ", scheduler: "
+                    + (scheduler.isStopped() ? "stopped" : "running")
+                    + ", status: " + lastUnwrapStatus;
         }
 
         private void reallocReadBuf() {
@@ -335,6 +343,7 @@
                 }
                 if (complete) {
                     this.completing = complete;
+                    minBytesRequired = 0;
                 }
             }
         }
@@ -395,13 +404,23 @@
                             // not enough data in the read buffer...
                             // no need to try to unwrap again unless we get more bytes
                             // than minBytesRequired = len in the read buffer.
-                            minBytesRequired = len;
                             synchronized (readBufferLock) {
+                                minBytesRequired = len;
                                 // more bytes could already have been added...
                                 assert readBuf.remaining() >= len;
                                 // check if we have received some data, and if so
                                 // we can just re-spin the loop
                                 if (readBuf.remaining() > len) continue;
+                                else if (this.completing) {
+                                    if (debug.on()) {
+                                        debugr.log("BUFFER_UNDERFLOW with EOF," +
+                                                " %d bytes non decrypted.", len);
+                                    }
+                                    // The channel won't send us any more data, and
+                                    // we are in underflow: we need to fail.
+                                    throw new IOException("BUFFER_UNDERFLOW with EOF, "
+                                            + len + " bytes non decrypted.");
+                                }
                             }
                             // request more data and return.
                             requestMore();
@@ -452,12 +471,13 @@
             }
         }
 
+        private volatile Status lastUnwrapStatus;
         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
             ByteBuffer dst = getAppBuffer();
             int len = src.remaining();
             while (true) {
                 SSLEngineResult sslResult = engine.unwrap(src, dst);
-                switch (sslResult.getStatus()) {
+                switch (lastUnwrapStatus = sslResult.getStatus()) {
                     case BUFFER_OVERFLOW:
                         // may happen if app size buffer was changed, or if
                         // our 'adaptiveBufferSize' guess was too small for
@@ -507,7 +527,9 @@
     }
 
     public static class Monitor extends Thread {
-        final List<Monitorable> list;
+        final List<WeakReference<Monitorable>> list;
+        final List<FinalMonitorable> finalList;
+        final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
         static Monitor themon;
 
         static {
@@ -515,19 +537,61 @@
             themon.start(); // uncomment to enable Monitor
         }
 
+        // An instance used to temporarily store the
+        // last observable state of a monitorable object.
+        // When Monitor.remove(o) is called, we replace
+        // 'o' with a FinalMonitorable whose reference
+        // will be enqueued after the last observable state
+        // has been printed.
+        final class FinalMonitorable implements Monitorable {
+            final String finalState;
+            FinalMonitorable(Monitorable o) {
+                finalState = o.getInfo();
+                finalList.add(this);
+            }
+            @Override
+            public String getInfo() {
+                finalList.remove(this);
+                return finalState;
+            }
+        }
+
         Monitor() {
             super("Monitor");
             setDaemon(true);
             list = Collections.synchronizedList(new LinkedList<>());
+            finalList = new ArrayList<>(); // access is synchronized on list above
         }
 
         void addTarget(Monitorable o) {
-            list.add(o);
+            list.add(new WeakReference<>(o, queue));
+        }
+        void removeTarget(Monitorable o) {
+            // It can take a long time for GC to clean up references.
+            // Calling Monitor.remove() early helps removing noise from the
+            // logs/
+            synchronized (list) {
+                Iterator<WeakReference<Monitorable>> it = list.iterator();
+                while (it.hasNext()) {
+                    Monitorable m = it.next().get();
+                    if (m == null) it.remove();
+                    if (o == m) {
+                        it.remove();
+                        break;
+                    }
+                }
+                FinalMonitorable m = new FinalMonitorable(o);
+                addTarget(m);
+                Reference.reachabilityFence(m);
+            }
         }
 
         public static void add(Monitorable o) {
             themon.addTarget(o);
         }
+        public static void remove(Monitorable o) {
+            themon.removeTarget(o);
+        }
 
         @Override
         public void run() {
@@ -536,7 +600,14 @@
                 while (true) {
                     Thread.sleep(20 * 1000);
                     synchronized (list) {
-                        for (Monitorable o : list) {
+                        Reference<? extends Monitorable> expired;
+                        while ((expired = queue.poll()) != null) list.remove(expired);
+                        for (WeakReference<Monitorable> ref : list) {
+                            Monitorable o = ref.get();
+                            if (o == null) continue;
+                            if (o instanceof FinalMonitorable) {
+                                ref.enqueue();
+                            }
                             System.out.println(o.getInfo());
                             System.out.println("-------------------------");
                         }
@@ -733,6 +804,7 @@
         // downstream. Otherwise, we send the writeBuffer downstream
         // and will allocate a new one next time.
         volatile ByteBuffer writeBuffer;
+        private volatile Status lastWrappedStatus;
         @SuppressWarnings("fallthrough")
         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
             long len = Utils.remaining(src);
@@ -747,7 +819,7 @@
             while (true) {
                 SSLEngineResult sslResult = engine.wrap(src, dst);
                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
-                switch (sslResult.getStatus()) {
+                switch (lastWrappedStatus = sslResult.getStatus()) {
                     case BUFFER_OVERFLOW:
                         // Shouldn't happen. We allocated buffer with packet size
                         // get it again if net buffer size was changed
@@ -815,8 +887,10 @@
 
         @Override
         public String toString() {
-            return "WRITER: " + super.toString() +
-                    " writeList size " + Integer.toString(writeList.size());
+            return "WRITER: " + super.toString()
+                    + ", writeList size: " + Integer.toString(writeList.size())
+                    + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
+                    + ", status: " + lastWrappedStatus;
                     //" writeList: " + writeList.toString();
         }
     }
@@ -839,6 +913,7 @@
         stopped = true;
         reader.stop();
         writer.stop();
+        if (isMonitored) Monitor.remove(monitor);
     }
 
     private Void stopOnError(Throwable currentlyUnused) {