src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 49944 4690a2871b44
parent 49765 ee6f7a61f3a5
child 50681 4254bed3c09d
child 56507 2294c51eae30
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed May 02 10:47:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed May 02 02:36:17 2018 -0700
@@ -33,7 +33,6 @@
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,6 +45,7 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -97,6 +97,7 @@
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
+    final Consumer<ByteBuffer> recycler;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
 
@@ -110,8 +111,23 @@
                            Subscriber<? super List<ByteBuffer>> downReader,
                            Subscriber<? super List<ByteBuffer>> downWriter)
     {
+        this(engine, exec, null, downReader, downWriter);
+    }
+
+    /**
+     * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+     * Flow.Subscriber requires an associated {@link CompletableFuture}
+     * for errors that need to be signaled from downstream to upstream.
+     */
+    public SSLFlowDelegate(SSLEngine engine,
+            Executor exec,
+            Consumer<ByteBuffer> recycler,
+            Subscriber<? super List<ByteBuffer>> downReader,
+            Subscriber<? super List<ByteBuffer>> downWriter)
+        {
         this.id = scount.getAndIncrement();
         this.tubeName = String.valueOf(downWriter);
+        this.recycler = recycler;
         this.reader = new Reader();
         this.writer = new Writer();
         this.engine = engine;
@@ -181,9 +197,11 @@
         sb.append("SSL: id ").append(id);
         sb.append(" HS state: " + states(handshakeState));
         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
-        sb.append(" LL : ");
-        for (String s: stateList) {
-            sb.append(s).append(" ");
+        if (stateList != null) {
+            sb.append(" LL : ");
+            for (String s : stateList) {
+                sb.append(s).append(" ");
+            }
         }
         sb.append("\r\n");
         sb.append("Reader:: ").append(reader.toString());
@@ -213,15 +231,20 @@
      * Upstream subscription strategy is to try and keep no more than
      * TARGET_BUFSIZE bytes in readBuf
      */
-    class Reader extends SubscriberWrapper {
+    final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
+        // Maximum record size is 16k.
+        // Because SocketTube can feeds us up to 3 16K buffers,
+        // then setting this size to 16K means that the readBuf
+        // can store up to 64K-1 (16K-1 + 3*16K)
+        static final int TARGET_BUFSIZE = 16 * 1024;
+
         final SequentialScheduler scheduler;
-        static final int TARGET_BUFSIZE = 16 * 1024;
         volatile ByteBuffer readBuf;
         volatile boolean completing;
         final Object readBufferLock = new Object();
         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 
-        class ReaderDownstreamPusher implements Runnable {
+        private final class ReaderDownstreamPusher implements Runnable {
             @Override public void run() { processData(); }
         }
 
@@ -233,6 +256,11 @@
             readBuf.limit(0); // keep in read mode
         }
 
+        @Override
+        public boolean supportsRecycling() {
+            return recycler != null;
+        }
+
         protected SchedulingAction enterScheduling() {
             return enterReadScheduling();
         }
@@ -250,7 +278,7 @@
                 debugr.log("Adding %d bytes to read buffer",
                            Utils.remaining(buffers));
             addToReadBuf(buffers, complete);
-            scheduler.runOrSchedule();
+            scheduler.runOrSchedule(exec);
         }
 
         @Override
@@ -270,6 +298,9 @@
         @Override
         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
             if (readBuf.remaining() > TARGET_BUFSIZE) {
+                if (debugr.on())
+                    debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
+                               readBuf.remaining());
                 return 0;
             } else {
                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -285,6 +316,11 @@
                         reallocReadBuf();
                     readBuf.put(buf);
                     readBuf.flip();
+                    // should be safe to call inside lock
+                    // since the only implementation
+                    // offers the buffer to an unbounded queue.
+                    // WARNING: do not touch buf after this point!
+                    if (recycler != null) recycler.accept(buf);
                 }
                 if (complete) {
                     this.completing = complete;
@@ -293,7 +329,7 @@
         }
 
         void schedule() {
-            scheduler.runOrSchedule();
+            scheduler.runOrSchedule(exec);
         }
 
         void stop() {
@@ -303,8 +339,13 @@
 
         AtomicInteger count = new AtomicInteger(0);
 
+        // minimum number of bytes required to call unwrap.
+        // Usually this is 0, unless there was a buffer underflow.
+        // In this case we need to wait for more bytes than what
+        // we had before calling unwrap() again.
+        volatile int minBytesRequired;
         // work function where it all happens
-        void processData() {
+        final void processData() {
             try {
                 if (debugr.on())
                     debugr.log("processData:"
@@ -313,15 +354,23 @@
                            + ", engine handshake status:" + engine.getHandshakeStatus());
                 int len;
                 boolean complete = false;
-                while ((len = readBuf.remaining()) > 0) {
+                while (readBuf.remaining() > (len = minBytesRequired)) {
                     boolean handshaking = false;
                     try {
                         EngineResult result;
                         synchronized (readBufferLock) {
                             complete = this.completing;
+                            if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
+                            // Unless there is a BUFFER_UNDERFLOW, we should try to
+                            // unwrap any number of bytes. Set minBytesRequired to 0:
+                            // we only need to do that if minBytesRequired is not already 0.
+                            len = len > 0 ? minBytesRequired = 0 : len;
                             result = unwrapBuffer(readBuf);
-                            if (debugr.on())
-                                debugr.log("Unwrapped: %s", result.result);
+                            len = readBuf.remaining();
+                            if (debugr.on()) {
+                                debugr.log("Unwrapped: result: %s", result.result);
+                                debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
+                            }
                         }
                         if (result.bytesProduced() > 0) {
                             if (debugr.on())
@@ -332,12 +381,19 @@
                         if (result.status() == Status.BUFFER_UNDERFLOW) {
                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
                             // not enough data in the read buffer...
-                            requestMore();
+                            // no need to try to unwrap again unless we get more bytes
+                            // than minBytesRequired = len in the read buffer.
+                            minBytesRequired = len;
                             synchronized (readBufferLock) {
-                                // check if we have received some data
+                                // more bytes could already have been added...
+                                assert readBuf.remaining() >= len;
+                                // check if we have received some data, and if so
+                                // we can just re-spin the loop
                                 if (readBuf.remaining() > len) continue;
-                                return;
                             }
+                            // request more data and return.
+                            requestMore();
+                            return;
                         }
                         if (complete && result.status() == Status.CLOSED) {
                             if (debugr.on()) debugr.log("Closed: completing");
@@ -352,8 +408,10 @@
                             handshaking = true;
                         } else {
                             if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
+                                handshaking = false;
+                                applicationBufferSize = engine.getSession().getApplicationBufferSize();
+                                packetBufferSize = engine.getSession().getPacketBufferSize();
                                 setALPN();
-                                handshaking = false;
                                 resumeActivity();
                             }
                         }
@@ -391,7 +449,8 @@
                     case BUFFER_OVERFLOW:
                         // may happen only if app size buffer was changed.
                         // get it again if app buffer size changed
-                        int appSize = engine.getSession().getApplicationBufferSize();
+                        int appSize = applicationBufferSize =
+                                engine.getSession().getApplicationBufferSize();
                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
                         dst.flip();
                         b.put(dst);
@@ -489,7 +548,7 @@
 
         @Override
         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
-            assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
+            assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
             if (complete) {
                 if (debugw.on()) debugw.log("adding SENTINEL");
@@ -549,6 +608,15 @@
             }
         }
 
+        void triggerWrite() {
+            synchronized (writeList) {
+                if (writeList.isEmpty()) {
+                    writeList.add(HS_TRIGGER);
+                }
+            }
+            scheduler.runOrSchedule();
+        }
+
         private void processData() {
             boolean completing = isCompleting();
 
@@ -586,6 +654,8 @@
                         handshaking = true;
                     } else {
                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+                            applicationBufferSize = engine.getSession().getApplicationBufferSize();
+                            packetBufferSize = engine.getSession().getPacketBufferSize();
                             setALPN();
                             resumeActivity();
                         }
@@ -630,8 +700,9 @@
                         // Shouldn't happen. We allocated buffer with packet size
                         // get it again if net buffer size was changed
                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
-                        int appSize = engine.getSession().getApplicationBufferSize();
-                        ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
+                        int netSize = packetBufferSize
+                                = engine.getSession().getPacketBufferSize();
+                        ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
                         dst.flip();
                         b.put(dst);
                         dst = b;
@@ -759,13 +830,16 @@
     }
 
     final AtomicInteger handshakeState;
-    final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> stateList =
+            debug.on() ? new ConcurrentLinkedQueue<>() : null;
 
     private boolean doHandshake(EngineResult r, int caller) {
         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
-        stateList.add(r.handshakeStatus().toString());
-        stateList.add(Integer.toString(caller));
+        if (stateList != null && debug.on()) {
+            stateList.add(r.handshakeStatus().toString());
+            stateList.add(Integer.toString(caller));
+        }
         switch (r.handshakeStatus()) {
             case NEED_TASK:
                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
@@ -778,7 +852,7 @@
                 return false;  // executeTasks will resume activity
             case NEED_WRAP:
                 if (caller == READER) {
-                    writer.addData(HS_TRIGGER);
+                    writer.triggerWrite();
                     return false;
                 }
                 break;
@@ -818,7 +892,6 @@
                     }
                 } while (true);
                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
-                //writer.addData(HS_TRIGGER);
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);
@@ -839,7 +912,17 @@
             if (engine.isInboundDone() && !engine.isOutboundDone()) {
                 if (debug.on()) debug.log("doClosure: close_notify received");
                 close_notify_received = true;
-                doHandshake(r, READER);
+                if (!writer.scheduler.isStopped()) {
+                    doHandshake(r, READER);
+                } else {
+                    // We have received closed notify, but we
+                    // won't be able to send the acknowledgement.
+                    // Nothing more will come from the socket either,
+                    // so mark the reader as completed.
+                    synchronized (reader.readBufferLock) {
+                        reader.completing = true;
+                    }
+                }
             }
         }
         return r;
@@ -914,12 +997,22 @@
         }
     }
 
-    public ByteBuffer getNetBuffer() {
-        return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+    volatile int packetBufferSize;
+    final ByteBuffer getNetBuffer() {
+        int netSize = packetBufferSize;
+        if (netSize <= 0) {
+            packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
+        }
+        return ByteBuffer.allocate(netSize);
     }
 
-    private ByteBuffer getAppBuffer() {
-        return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
+    volatile int applicationBufferSize;
+    final ByteBuffer getAppBuffer() {
+        int appSize = applicationBufferSize;
+        if (appSize <= 0) {
+            applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
+        }
+        return ByteBuffer.allocate(appSize);
     }
 
     final String dbgString() {