src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 50985 cd41f34e548c
child 56795 03ece2518428
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jun 20 17:15:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jun 20 09:05:57 2018 -0700
@@ -46,6 +46,7 @@
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.IntBinaryOperator;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -87,13 +88,18 @@
     final Logger debug =
             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 
+    private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+    private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+    // When handshake is in progress trying to wrap may produce no bytes.
+    private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
+    private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
+
     final Executor exec;
     final Reader reader;
     final Writer writer;
     final SSLEngine engine;
     final String tubeName; // hack
     final CompletableFuture<String> alpnCF; // completes on initial handshake
-    final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
@@ -146,7 +152,8 @@
         // Writer to the downWriter.
         connect(downReader, downWriter);
 
-        //Monitor.add(this::monitor);
+        if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
+            Monitor.add(this::monitor);
     }
 
     /**
@@ -245,13 +252,16 @@
         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 
         private final class ReaderDownstreamPusher implements Runnable {
-            @Override public void run() { processData(); }
+            @Override
+            public void run() {
+                processData();
+            }
         }
 
         Reader() {
             super();
             scheduler = SequentialScheduler.synchronizedScheduler(
-                                                new ReaderDownstreamPusher());
+                    new ReaderDownstreamPusher());
             this.readBuf = ByteBuffer.allocate(1024);
             readBuf.limit(0); // keep in read mode
         }
@@ -276,7 +286,7 @@
         public void incoming(List<ByteBuffer> buffers, boolean complete) {
             if (debugr.on())
                 debugr.log("Adding %d bytes to read buffer",
-                           Utils.remaining(buffers));
+                        Utils.remaining(buffers));
             addToReadBuf(buffers, complete);
             scheduler.runOrSchedule(exec);
         }
@@ -289,7 +299,7 @@
 
         private void reallocReadBuf() {
             int sz = readBuf.capacity();
-            ByteBuffer newb = ByteBuffer.allocate(sz*2);
+            ByteBuffer newb = ByteBuffer.allocate(sz * 2);
             readBuf.flip();
             Utils.copy(readBuf, newb);
             readBuf = newb;
@@ -300,7 +310,7 @@
             if (readBuf.remaining() > TARGET_BUFSIZE) {
                 if (debugr.on())
                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
-                               readBuf.remaining());
+                            readBuf.remaining());
                 return 0;
             } else {
                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -309,6 +319,7 @@
 
         // readBuf is kept ready for reading outside of this method
         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
+            assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
             synchronized (readBufferLock) {
                 for (ByteBuffer buf : buffers) {
                     readBuf.compact();
@@ -344,14 +355,15 @@
         // In this case we need to wait for more bytes than what
         // we had before calling unwrap() again.
         volatile int minBytesRequired;
+
         // work function where it all happens
         final void processData() {
             try {
                 if (debugr.on())
                     debugr.log("processData:"
-                           + " readBuf remaining:" + readBuf.remaining()
-                           + ", state:" + states(handshakeState)
-                           + ", engine handshake status:" + engine.getHandshakeStatus());
+                            + " readBuf remaining:" + readBuf.remaining()
+                            + ", state:" + states(handshakeState)
+                            + ", engine handshake status:" + engine.getHandshakeStatus());
                 int len;
                 boolean complete = false;
                 while (readBuf.remaining() > (len = minBytesRequired)) {
@@ -400,14 +412,13 @@
                             outgoing(Utils.EMPTY_BB_LIST, true);
                             return;
                         }
-                        if (result.handshaking() && !complete) {
+                        if (result.handshaking()) {
+                            handshaking = true;
                             if (debugr.on()) debugr.log("handshaking");
-                            if (doHandshake(result, READER)) {
-                                resumeActivity();
-                            }
-                            handshaking = true;
+                            if (doHandshake(result, READER)) continue; // need unwrap
+                            else break; // doHandshake will have triggered the write scheduler if necessary
                         } else {
-                            if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
+                            if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
                                 handshaking = false;
                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
                                 packetBufferSize = engine.getSession().getPacketBufferSize();
@@ -443,12 +454,19 @@
 
         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
             ByteBuffer dst = getAppBuffer();
+            int len = src.remaining();
             while (true) {
                 SSLEngineResult sslResult = engine.unwrap(src, dst);
                 switch (sslResult.getStatus()) {
                     case BUFFER_OVERFLOW:
-                        // may happen only if app size buffer was changed.
-                        // get it again if app buffer size changed
+                        // may happen if app size buffer was changed, or if
+                        // our 'adaptiveBufferSize' guess was too small for
+                        // the current payload. In that case, update the
+                        // value of applicationBufferSize, and allocate a
+                        // buffer of that size, which we are sure will be
+                        // big enough to decode whatever needs to be
+                        // decoded. We will later update adaptiveBufferSize
+                        // in OK: below.
                         int appSize = applicationBufferSize =
                                 engine.getSession().getApplicationBufferSize();
                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
@@ -457,11 +475,26 @@
                         dst = b;
                         break;
                     case CLOSED:
+                        assert dst.position() == 0;
                         return doClosure(new EngineResult(sslResult));
                     case BUFFER_UNDERFLOW:
                         // handled implicitly by compaction/reallocation of readBuf
+                        assert dst.position() == 0;
                         return new EngineResult(sslResult);
                     case OK:
+                        int size = dst.position();
+                        if (debug.on()) {
+                            debugr.log("Decoded " + size + " bytes out of " + len
+                                    + " into buffer of " + dst.capacity()
+                                    + " remaining to decode: " + src.remaining());
+                        }
+                        // if the record payload was bigger than what was originally
+                        // allocated, then sets the adaptiveAppBufferSize to size
+                        // and we will use that new size as a guess for the next app
+                        // buffer.
+                        if (size > adaptiveAppBufferSize) {
+                            adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
+                        }
                         dst.flip();
                         return new EngineResult(sslResult, dst);
                 }
@@ -662,8 +695,8 @@
                     }
                     cleanList(writeList); // tidy up the source list
                     sendResultBytes(result);
-                    if (handshaking && !completing) {
-                        if (needWrap()) {
+                    if (handshaking) {
+                        if (!completing && needWrap()) {
                             continue;
                         } else {
                             return;
@@ -687,11 +720,30 @@
             }
         }
 
+        // The SSLEngine insists on being given a buffer that is at least
+        // SSLSession.getPacketBufferSize() long (usually 16K). If given
+        // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
+        // has 6 bytes to wrap. Typical usage shows that for GET we
+        // usually produce an average of ~ 100 bytes.
+        // To avoid wasting space, and because allocating and zeroing
+        // 16K buffers for encoding 6 bytes is costly, we are reusing the
+        // same writeBuffer to interact with SSLEngine.wrap().
+        // If the SSLEngine produces less than writeBuffer.capacity() / 2,
+        // then we copy off the bytes to a smaller buffer that we send
+        // downstream. Otherwise, we send the writeBuffer downstream
+        // and will allocate a new one next time.
+        volatile ByteBuffer writeBuffer;
         @SuppressWarnings("fallthrough")
         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
+            long len = Utils.remaining(src);
             if (debugw.on())
-                debugw.log("wrapping " + Utils.remaining(src) + " bytes");
-            ByteBuffer dst = getNetBuffer();
+                debugw.log("wrapping " + len + " bytes");
+
+            ByteBuffer dst = writeBuffer;
+            if (dst == null) dst = writeBuffer = getNetBuffer();
+            assert dst.position() == 0 : "buffer position is " + dst.position();
+            assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
+
             while (true) {
                 SSLEngineResult sslResult = engine.wrap(src, dst);
                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
@@ -702,7 +754,7 @@
                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
                         int netSize = packetBufferSize
                                 = engine.getSession().getPacketBufferSize();
-                        ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
+                        ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
                         dst.flip();
                         b.put(dst);
                         dst = b;
@@ -712,11 +764,27 @@
                         // fallthrough. There could be some remaining data in dst.
                         // CLOSED will be handled by the caller.
                     case OK:
-                        dst.flip();
-                        final ByteBuffer dest = dst;
+                        final ByteBuffer dest;
+                        if (dst.position() == 0) {
+                            dest = NOTHING; // can happen if handshake is in progress
+                        } else if (dst.position() < dst.capacity() / 2) {
+                            // less than half the buffer was used.
+                            // copy off the bytes to a smaller buffer, and keep
+                            // the writeBuffer for next time.
+                            dst.flip();
+                            dest = Utils.copyAligned(dst);
+                            dst.clear();
+                        } else {
+                            // more than half the buffer was used.
+                            // just send that buffer downstream, and we will
+                            // get a new writeBuffer next time it is needed.
+                            dst.flip();
+                            dest = dst;
+                            writeBuffer = null;
+                        }
                         if (debugw.on())
-                            debugw.log("OK => produced: %d, not wrapped: %d",
-                                       dest.remaining(),  Utils.remaining(src));
+                            debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
+                                       dest.remaining(),  dest.capacity(), Utils.remaining(src));
                         return new EngineResult(sslResult, dest);
                     case BUFFER_UNDERFLOW:
                         // Shouldn't happen.  Doesn't returns when wrap()
@@ -799,8 +867,12 @@
     private static final int NOT_HANDSHAKING = 0;
     private static final int HANDSHAKING = 1;
 
-    private static final int DOING_TASKS = 4; // bit added to above state
-    private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+    // Bit flags
+    // a thread is currently executing tasks
+    private static final int DOING_TASKS = 4;
+    // a thread wants to execute tasks, while another thread is executing
+    private static final int REQUESTING_TASKS = 8;
+    private static final int TASK_BITS = 12; // Both bits
 
     private static final int READER = 1;
     private static final int WRITER = 2;
@@ -808,7 +880,7 @@
     private static String states(AtomicInteger state) {
         int s = state.get();
         StringBuilder sb = new StringBuilder();
-        int x = s & ~DOING_TASKS;
+        int x = s & ~TASK_BITS;
         switch (x) {
             case NOT_HANDSHAKING:
                 sb.append(" NOT_HANDSHAKING ");
@@ -821,6 +893,8 @@
         }
         if ((s & DOING_TASKS) > 0)
             sb.append("|DOING_TASKS");
+        if ((s & REQUESTING_TASKS) > 0)
+            sb.append("|REQUESTING_TASKS");
         return sb.toString();
     }
 
@@ -833,18 +907,37 @@
     final ConcurrentLinkedQueue<String> stateList =
             debug.on() ? new ConcurrentLinkedQueue<>() : null;
 
+    // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
+    // depending on previous value
+    private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
+        if ((current & DOING_TASKS) == 0)
+            return DOING_TASKS | (current & HANDSHAKING);
+        else
+            return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
+    };
+
+    // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
+    // clears bits if not.
+    private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
+        if ((current & REQUESTING_TASKS) != 0)
+            return DOING_TASKS | (current & HANDSHAKING);
+        // clear both bits
+        return (current & HANDSHAKING);
+    };
+
     private boolean doHandshake(EngineResult r, int caller) {
-        // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
-        handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+        // unconditionally sets the HANDSHAKING bit, while preserving task bits
+        handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
         if (stateList != null && debug.on()) {
             stateList.add(r.handshakeStatus().toString());
             stateList.add(Integer.toString(caller));
         }
         switch (r.handshakeStatus()) {
             case NEED_TASK:
-                int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
-                if ((s & DOING_TASKS) > 0) // someone else was doing tasks
+                int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
+                if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
                     return false;
+                }
 
                 if (debug.on()) debug.log("obtaining and initiating task execution");
                 List<Runnable> tasks = obtainTasks();
@@ -878,20 +971,25 @@
     }
 
     private void executeTasks(List<Runnable> tasks) {
-        if (tasks.isEmpty())
-            return;
         exec.execute(() -> {
             try {
                 List<Runnable> nextTasks = tasks;
+                if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
                 do {
                     nextTasks.forEach(Runnable::run);
                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
                         nextTasks = obtainTasks();
                     } else {
+                        int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
+                        if ((s & DOING_TASKS) != 0) {
+                            if (debug.on()) debug.log("re-running tasks (B)");
+                            nextTasks = obtainTasks();
+                            continue;
+                        }
                         break;
                     }
                 } while (true);
-                handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
+                if (debug.on()) debug.log("finished task execution");
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);
@@ -997,6 +1095,8 @@
         }
     }
 
+    // The maximum network buffer size negotiated during
+    // the handshake. Usually 16K.
     volatile int packetBufferSize;
     final ByteBuffer getNetBuffer() {
         int netSize = packetBufferSize;
@@ -1006,13 +1106,32 @@
         return ByteBuffer.allocate(netSize);
     }
 
+    // The maximum application buffer size negotiated during
+    // the handshake. Usually close to 16K.
     volatile int applicationBufferSize;
+    // Despite of the maximum applicationBufferSize negotiated
+    // above, TLS records usually have a much smaller payload.
+    // The adaptativeAppBufferSize records the max payload
+    // ever decoded, and we use that as a guess for how big
+    // a buffer we will need for the next payload.
+    // This avoids allocating and zeroing a 16K buffer for
+    // nothing...
+    volatile int adaptiveAppBufferSize;
     final ByteBuffer getAppBuffer() {
         int appSize = applicationBufferSize;
         if (appSize <= 0) {
-            applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
+            applicationBufferSize = appSize
+                    = engine.getSession().getApplicationBufferSize();
         }
-        return ByteBuffer.allocate(appSize);
+        int size = adaptiveAppBufferSize;
+        if (size <= 0) {
+            size = 512; // start with 512 this is usually enough for handshaking / headers
+        } else if (size > appSize) {
+            size = appSize;
+        }
+        // will cause a BUFFER_OVERFLOW if not big enough, but
+        // that's OK.
+        return ByteBuffer.allocate(size);
     }
 
     final String dbgString() {