src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56481 247ed0848e48
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Mon Apr 23 15:45:40 2018 +0100
@@ -26,7 +26,6 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -39,6 +38,7 @@
 import java.util.ArrayList;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import jdk.internal.net.http.common.BufferSupplier;
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Logger;
@@ -59,7 +59,7 @@
 
     private final HttpClientImpl client;
     private final SocketChannel channel;
-    private final Supplier<ByteBuffer> buffersSource;
+    private final SliceBufferSource sliceBuffersSource;
     private final Object lock = new Object();
     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
     private final InternalReadPublisher readPublisher;
@@ -67,10 +67,11 @@
     private final long id = IDS.incrementAndGet();
 
     public SocketTube(HttpClientImpl client, SocketChannel channel,
-                      Supplier<ByteBuffer> buffersSource) {
+                      Supplier<ByteBuffer> buffersFactory) {
         this.client = client;
         this.channel = channel;
-        this.buffersSource = buffersSource;
+        this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
+
         this.readPublisher = new InternalReadPublisher();
         this.writeSubscriber = new InternalWriteSubscriber();
     }
@@ -564,6 +565,7 @@
             final InternalReadSubscription impl;
             final TubeSubscriber  subscriber;
             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+            final BufferSource bufferSource;
             volatile boolean subscribed;
             volatile boolean cancelled;
             volatile boolean completed;
@@ -571,6 +573,9 @@
             public ReadSubscription(InternalReadSubscription impl,
                                     TubeSubscriber subscriber) {
                 this.impl = impl;
+                this.bufferSource = subscriber.supportsRecycling()
+                        ? new SSLDirectBufferSource(client)
+                        : SocketTube.this.sliceBuffersSource;
                 this.subscriber = subscriber;
             }
 
@@ -779,7 +784,7 @@
                         if (demand.tryDecrement()) {
                             // we have demand.
                             try {
-                                List<ByteBuffer> bytes = readAvailable();
+                                List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
                                 if (bytes == EOF) {
                                     if (!completed) {
                                         if (debug.on()) debug.log("got read EOF");
@@ -906,6 +911,147 @@
     }
 
     // ===================================================================== //
+    //                       Buffer Management                               //
+    // ===================================================================== //
+
+    // This interface is used by readAvailable(BufferSource);
+    public interface BufferSource {
+        /**
+         * Returns a buffer to read data from the socket.
+         * Different implementation can have different strategies, as to
+         * which kind of buffer to return, or whether to return the same
+         * buffer. The only constraints are that
+         *   a. the buffer returned must not be null
+         *   b. the buffer position indicates where to start reading
+         *   c. the buffer limit indicates where to stop reading.
+         *   d. the buffer is 'free' - that is - it is not used
+         *      or retained by anybody else
+         * @return A buffer to read data from the socket.
+         */
+        ByteBuffer getBuffer();
+
+        /**
+         * Append the data read into the buffer to the list of buffer to
+         * be sent downstream to the subscriber. May return a new
+         * list, or append to the given list.
+         *
+         * Different implementation can have different strategies, but
+         * must obviously be consistent with the implementation of the
+         * getBuffer() method. For instance, an implementation could
+         * decide to add the buffer to the list and return a new buffer
+         * next time getBuffer() is called, or could decide to add a buffer
+         * slice to the list and return the same buffer (if remaining
+         * space is available) next time getBuffer() is called.
+         *
+         * @param list    The list before adding the data. Can be null.
+         * @param buffer  The buffer containing the data to add to the list.
+         * @param start   The start position at which data were read.
+         *                The current buffer position indicates the end.
+         * @return A possibly new list where a buffer containing the
+         *         data read from the socket has been added.
+         */
+        List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+    }
+
+    // An implementation of BufferSource used for unencrypted data.
+    // This buffer source uses heap buffers and avoids wasting memory
+    // by forwarding read only buffer slices downstream.
+    // Buffers allocated through this source are simply GC'ed when
+    // they are no longer referenced.
+    static final class SliceBufferSource implements BufferSource {
+        private final Supplier<ByteBuffer> factory;
+        private volatile ByteBuffer current;
+        public SliceBufferSource() {
+            this(Utils::getBuffer);
+        }
+        public SliceBufferSource(Supplier<ByteBuffer> factory) {
+            this.factory = Objects.requireNonNull(factory);
+        }
+
+        // reuse the same buffer if some space remains available.
+        // otherwise, returns a new heap buffer.
+        @Override
+        public final ByteBuffer getBuffer() {
+            ByteBuffer buf = current;
+            buf = (buf == null || !buf.hasRemaining())
+                    ? (current = factory.get()) : buf;
+            assert buf.hasRemaining();
+            return buf;
+        }
+
+        // Adds a read only slice to the list, potentially returning a
+        // new list with with that slice at the end.
+        @Override
+        public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+            // creates a slice to add to the list
+            int limit = buf.limit();
+            buf.limit(buf.position());
+            buf.position(start);
+            ByteBuffer slice = buf.slice();
+
+            // restore buffer state to what it was before creating the slice
+            buf.position(buf.limit());
+            buf.limit(limit);
+
+            // add the buffer to the list
+            return SocketTube.listOf(list, slice.asReadOnlyBuffer());
+        }
+    }
+
+
+    // An implementation of BufferSource used for encrypted data.
+    // This buffer source use direct byte buffers that will be
+    // recycled by the SocketTube subscriber.
+    //
+    static final class SSLDirectBufferSource implements BufferSource {
+        private final Supplier<ByteBuffer> factory;
+        private final HttpClientImpl client;
+        private volatile ByteBuffer current;
+
+        public SSLDirectBufferSource(HttpClientImpl client) {
+            this.client = Objects.requireNonNull(client);
+            this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
+        }
+
+        // Obtain a 'free' byte buffer from the pool, or return
+        // the same buffer if nothing was read at the previous cycle.
+        // The subscriber will be responsible for recycling this
+        // buffer into the pool (see SSLFlowDelegate.Reader)
+        @Override
+        public final ByteBuffer getBuffer() {
+            assert client.isSelectorThread();
+            ByteBuffer buf = current;
+            if (buf == null) {
+                buf = current = factory.get();
+            }
+            assert buf.hasRemaining();
+            assert buf.position() == 0;
+            return buf;
+        }
+
+        // Adds the buffer to the list. The buffer will be later returned to the
+        // pool by the subscriber (see SSLFlowDelegate.Reader).
+        // The next buffer returned by getBuffer() will be obtained from the
+        // pool. It might be the same buffer or another one.
+        // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
+        // recycling will happen in the flow before onNext returns, then the
+        // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
+        // it's shared by all SSL connections opened on that client.
+        @Override
+        public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+            assert client.isSelectorThread();
+            assert buf.isDirect();
+            assert start == 0;
+            assert current == buf;
+            current = null;
+            buf.limit(buf.position());
+            buf.position(start);
+            // add the buffer to the list
+            return SocketTube.listOf(list, buf);
+        }
+    }
+
+    // ===================================================================== //
     //                   Socket Channel Read/Write                           //
     // ===================================================================== //
     static final int MAX_BUFFERS = 3;
@@ -918,11 +1064,8 @@
     // is inserted into the returned buffer list, and if the current buffer
     // has remaining space, that space will be used to read more data when
     // the channel becomes readable again.
-    private volatile ByteBuffer current;
-    private List<ByteBuffer> readAvailable() throws IOException {
-        ByteBuffer buf = current;
-        buf = (buf == null || !buf.hasRemaining())
-                ? (current = buffersSource.get()) : buf;
+    private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
+        ByteBuffer buf = buffersSource.getBuffer();
         assert buf.hasRemaining();
 
         int read;
@@ -961,31 +1104,20 @@
             // check whether this buffer has still some free space available.
             // if so, we will keep it for the next round.
             final boolean hasRemaining = buf.hasRemaining();
-
-            // creates a slice to add to the list
-            int limit = buf.limit();
-            buf.limit(buf.position());
-            buf.position(pos);
-            ByteBuffer slice = buf.slice();
+            list = buffersSource.append(list, buf, pos);
 
-            // restore buffer state to what it was before creating the slice
-            buf.position(buf.limit());
-            buf.limit(limit);
-
-            // add the buffer to the list
-            list = addToList(list, slice.asReadOnlyBuffer());
             if (read <= 0 || list.size() == MAX_BUFFERS) {
                 break;
             }
 
-            buf = hasRemaining ? buf : (current = buffersSource.get());
+            buf = buffersSource.getBuffer();
             pos = buf.position();
             assert buf.hasRemaining();
         }
         return list;
     }
 
-    private <T> List<T> addToList(List<T> list, T item) {
+    private static <T> List<T> listOf(List<T> list, T item) {
         int size = list == null ? 0 : list.size();
         switch (size) {
             case 0: return List.of(item);