src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56477 46c04919ee5c
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Mon Apr 23 15:45:40 2018 +0100
@@ -28,12 +28,12 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.net.Authenticator;
 import java.net.CookieHandler;
 import java.net.ProxySelector;
+import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
@@ -47,7 +47,6 @@
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -57,6 +56,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -70,6 +70,7 @@
 import java.net.http.HttpResponse.BodyHandler;
 import java.net.http.HttpResponse.PushPromiseHandler;
 import java.net.http.WebSocket;
+import jdk.internal.net.http.common.BufferSupplier;
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.Logger;
 import jdk.internal.net.http.common.Pair;
@@ -138,6 +139,13 @@
     private final long id;
     private final String dbgTag;
 
+    // The SSL DirectBuffer Supplier provides the ability to recycle
+    // buffers used between the socket reader and the SSLEngine, or
+    // more precisely between the SocketTube publisher and the
+    // SSLFlowDelegate reader.
+    private final SSLDirectBufferSupplier sslBufferSupplier
+            = new SSLDirectBufferSupplier(this);
+
     // This reference is used to keep track of the facade HttpClient
     // that was returned to the application code.
     // It makes it possible to know when the application no longer
@@ -1164,4 +1172,70 @@
                 0 // only set the size if > 0
         );
     }
+
+    // Optimization for reading SSL encrypted data
+    // --------------------------------------------
+
+    // Returns a BufferSupplier that can be used for reading
+    // encrypted bytes of the socket. These buffers can then
+    // be recycled by the SSLFlowDelegate::Reader after their
+    // content has been copied in the SSLFlowDelegate::Reader
+    // readBuf.
+    // Because allocating, reading, copying, and recycling
+    // all happen in the SelectorManager thread,
+    // then this BufferSupplier can be shared between all
+    // the SSL connections managed by this client.
+    BufferSupplier getSSLBufferSupplier() {
+        return sslBufferSupplier;
+    }
+
+    // An implementation of BufferSupplier that manage a pool of
+    // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
+    // are used for reading encrypted bytes off the socket before
+    // copying before unwrapping.
+    private static final class SSLDirectBufferSupplier implements BufferSupplier {
+        private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
+        private final HttpClientImpl client;
+        private final Logger debug;
+        public SSLDirectBufferSupplier(HttpClientImpl client) {
+            this.client = Objects.requireNonNull(client);
+            this.debug = client.debug;
+        }
+
+        // get a buffer from the pool, or allocate a new one if needed.
+        @Override
+        public ByteBuffer get() {
+            assert client.isSelectorThread();
+            ByteBuffer buf = queue.poll();
+            if (buf == null) {
+                if (debug.on()) {
+                    // should not appear more than SocketTube.MAX_BUFFERS
+                    debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
+                }
+                buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
+            } else {
+                // if (debug.on()) { // this trace is mostly noise.
+                //    debug.log("ByteBuffer.recycle(%d)", buf.remaining());
+                // }
+            }
+            assert buf.isDirect();
+            assert buf.position() == 0;
+            assert buf.hasRemaining();
+            assert buf.limit() == Utils.BUFSIZE;
+            return buf;
+        }
+
+        // return the buffer to the pool
+        @Override
+        public void recycle(ByteBuffer buffer) {
+            assert client.isSelectorThread();
+            assert buffer.isDirect();
+            assert !buffer.hasRemaining();
+            buffer.position(0);
+            buffer.limit(buffer.capacity());
+            queue.offer(buffer);
+            assert queue.size() <= SocketTube.MAX_BUFFERS;
+        }
+    }
+
 }