http-client-branch: use direct buffer pool for reading off SSL encrypted buffers from the socket + minor test fixes. http-client-branch
authordfuchs
Mon, 23 Apr 2018 15:45:40 +0100
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56476 df94b3e52c78
http-client-branch: use direct buffer pool for reading off SSL encrypted buffers from the socket + minor test fixes.
src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/BufferSupplier.java
src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferPool.java
src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferReference.java
src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
test/jdk/java/net/httpclient/ProxyServer.java
test/jdk/java/net/httpclient/SmallTimeout.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java	Mon Apr 23 15:45:40 2018 +0100
@@ -61,6 +61,7 @@
                     // create the SSLTube wrapping the SocketTube, with the given engine
                     flow = new SSLTube(engine,
                                        client().theExecutor(),
+                                       client().getSSLBufferSupplier()::recycle,
                                        plainConnection.getConnectionFlow());
                     return null; } );
     }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java	Mon Apr 23 15:45:40 2018 +0100
@@ -65,6 +65,7 @@
                     // create the SSLTube wrapping the SocketTube, with the given engine
                     flow = new SSLTube(engine,
                                        client().theExecutor(),
+                                       client().getSSLBufferSupplier()::recycle,
                                        plainConnection.getConnectionFlow());
                     return null;} );
     }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Mon Apr 23 15:45:40 2018 +0100
@@ -131,22 +131,38 @@
      *    in case of SSL connection.
      *
      * 1. Outgoing frames encoded to ByteBuffers.
-     *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
+     *    Outgoing ByteBuffers are created with required size and frequently small (except DataFrames, etc)
      *    At this place no pools at all. All outgoing buffers should be collected by GC.
      *
      * 2. Incoming ByteBuffers (decoded to frames).
      *    Here, total elimination of BB pool is not a good idea.
      *    We don't know how many bytes we will receive through network.
-     * So here we allocate buffer of reasonable size. The following life of the BB:
-     * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
-     *     BB is returned to pool,
-     * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
-     *     Such BB is never returned to pool and will be GCed.
-     * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
-     *     the buffer could be release to pool.
+     *
+     *    So here is a strategy we could try to implement:
+     *    We allocate buffer of reasonable size. The following life of the BB:
+     *      - If all frames decoded from the BB are other than DataFrame andHeaderFrame
+     *        (and HeaderFrame subclasses) BB is returned to pool,
+     *      - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer
+     *        obtained by slice() method. Such BB is never returned to pool and will be GCed.
+     *      - If we decoded HeadersFrame from the BB. Then header decoding is performed
+     *        inside processFrame method and the buffer could be release to pool.
      *
-     * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
-     *    because of we can't predict size encrypted packets.
+     *    At this moment we do not implement this strategy.
+     *    Instead we only use a pool for recycling SSL encrypted buffers read from
+     *    the socket (see 3).
+     *
+     * 3. SSL encrypted buffers. Here another pool was introduced and all net buffers are to/from
+     *    the pool, because of we can't predict size encrypted packets.
+     *
+     *    At the moment we only recycle encrypted buffers read from the socket, and we have
+     *    a pool of maximum 3 (SocketTube.MAX_BUFFERS = 3) direct buffers which are shared by
+     *    all connections on a given client.
+     *    This pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2, but only
+     *    for SSL encrypted buffers that circulate between the SocketTube publisher and
+     *    the SSLFlowDelegate Reader. Limiting the pool to this particular segment allows
+     *    us to use direct buffer and avoid one more copy.
+     *    See HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource, and
+     *    SSLTube.recycler.
      *
      */
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Mon Apr 23 15:45:40 2018 +0100
@@ -28,12 +28,12 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.net.Authenticator;
 import java.net.CookieHandler;
 import java.net.ProxySelector;
+import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
@@ -47,7 +47,6 @@
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -57,6 +56,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -70,6 +70,7 @@
 import java.net.http.HttpResponse.BodyHandler;
 import java.net.http.HttpResponse.PushPromiseHandler;
 import java.net.http.WebSocket;
+import jdk.internal.net.http.common.BufferSupplier;
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.Logger;
 import jdk.internal.net.http.common.Pair;
@@ -138,6 +139,13 @@
     private final long id;
     private final String dbgTag;
 
+    // The SSL DirectBuffer Supplier provides the ability to recycle
+    // buffers used between the socket reader and the SSLEngine, or
+    // more precisely between the SocketTube publisher and the
+    // SSLFlowDelegate reader.
+    private final SSLDirectBufferSupplier sslBufferSupplier
+            = new SSLDirectBufferSupplier(this);
+
     // This reference is used to keep track of the facade HttpClient
     // that was returned to the application code.
     // It makes it possible to know when the application no longer
@@ -1164,4 +1172,70 @@
                 0 // only set the size if > 0
         );
     }
+
+    // Optimization for reading SSL encrypted data
+    // --------------------------------------------
+
+    // Returns a BufferSupplier that can be used for reading
+    // encrypted bytes of the socket. These buffers can then
+    // be recycled by the SSLFlowDelegate::Reader after their
+    // content has been copied in the SSLFlowDelegate::Reader
+    // readBuf.
+    // Because allocating, reading, copying, and recycling
+    // all happen in the SelectorManager thread,
+    // then this BufferSupplier can be shared between all
+    // the SSL connections managed by this client.
+    BufferSupplier getSSLBufferSupplier() {
+        return sslBufferSupplier;
+    }
+
+    // An implementation of BufferSupplier that manage a pool of
+    // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
+    // are used for reading encrypted bytes off the socket before
+    // copying before unwrapping.
+    private static final class SSLDirectBufferSupplier implements BufferSupplier {
+        private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
+        private final HttpClientImpl client;
+        private final Logger debug;
+        public SSLDirectBufferSupplier(HttpClientImpl client) {
+            this.client = Objects.requireNonNull(client);
+            this.debug = client.debug;
+        }
+
+        // get a buffer from the pool, or allocate a new one if needed.
+        @Override
+        public ByteBuffer get() {
+            assert client.isSelectorThread();
+            ByteBuffer buf = queue.poll();
+            if (buf == null) {
+                if (debug.on()) {
+                    // should not appear more than SocketTube.MAX_BUFFERS
+                    debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
+                }
+                buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
+            } else {
+                // if (debug.on()) { // this trace is mostly noise.
+                //    debug.log("ByteBuffer.recycle(%d)", buf.remaining());
+                // }
+            }
+            assert buf.isDirect();
+            assert buf.position() == 0;
+            assert buf.hasRemaining();
+            assert buf.limit() == Utils.BUFSIZE;
+            return buf;
+        }
+
+        // return the buffer to the pool
+        @Override
+        public void recycle(ByteBuffer buffer) {
+            assert client.isSelectorThread();
+            assert buffer.isDirect();
+            assert !buffer.hasRemaining();
+            buffer.position(0);
+            buffer.limit(buffer.capacity());
+            queue.offer(buffer);
+            assert queue.size() <= SocketTube.MAX_BUFFERS;
+        }
+    }
+
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Mon Apr 23 15:45:40 2018 +0100
@@ -26,7 +26,6 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -39,6 +38,7 @@
 import java.util.ArrayList;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import jdk.internal.net.http.common.BufferSupplier;
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Logger;
@@ -59,7 +59,7 @@
 
     private final HttpClientImpl client;
     private final SocketChannel channel;
-    private final Supplier<ByteBuffer> buffersSource;
+    private final SliceBufferSource sliceBuffersSource;
     private final Object lock = new Object();
     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
     private final InternalReadPublisher readPublisher;
@@ -67,10 +67,11 @@
     private final long id = IDS.incrementAndGet();
 
     public SocketTube(HttpClientImpl client, SocketChannel channel,
-                      Supplier<ByteBuffer> buffersSource) {
+                      Supplier<ByteBuffer> buffersFactory) {
         this.client = client;
         this.channel = channel;
-        this.buffersSource = buffersSource;
+        this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
+
         this.readPublisher = new InternalReadPublisher();
         this.writeSubscriber = new InternalWriteSubscriber();
     }
@@ -564,6 +565,7 @@
             final InternalReadSubscription impl;
             final TubeSubscriber  subscriber;
             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+            final BufferSource bufferSource;
             volatile boolean subscribed;
             volatile boolean cancelled;
             volatile boolean completed;
@@ -571,6 +573,9 @@
             public ReadSubscription(InternalReadSubscription impl,
                                     TubeSubscriber subscriber) {
                 this.impl = impl;
+                this.bufferSource = subscriber.supportsRecycling()
+                        ? new SSLDirectBufferSource(client)
+                        : SocketTube.this.sliceBuffersSource;
                 this.subscriber = subscriber;
             }
 
@@ -779,7 +784,7 @@
                         if (demand.tryDecrement()) {
                             // we have demand.
                             try {
-                                List<ByteBuffer> bytes = readAvailable();
+                                List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
                                 if (bytes == EOF) {
                                     if (!completed) {
                                         if (debug.on()) debug.log("got read EOF");
@@ -906,6 +911,147 @@
     }
 
     // ===================================================================== //
+    //                       Buffer Management                               //
+    // ===================================================================== //
+
+    // This interface is used by readAvailable(BufferSource);
+    public interface BufferSource {
+        /**
+         * Returns a buffer to read data from the socket.
+         * Different implementation can have different strategies, as to
+         * which kind of buffer to return, or whether to return the same
+         * buffer. The only constraints are that
+         *   a. the buffer returned must not be null
+         *   b. the buffer position indicates where to start reading
+         *   c. the buffer limit indicates where to stop reading.
+         *   d. the buffer is 'free' - that is - it is not used
+         *      or retained by anybody else
+         * @return A buffer to read data from the socket.
+         */
+        ByteBuffer getBuffer();
+
+        /**
+         * Append the data read into the buffer to the list of buffer to
+         * be sent downstream to the subscriber. May return a new
+         * list, or append to the given list.
+         *
+         * Different implementation can have different strategies, but
+         * must obviously be consistent with the implementation of the
+         * getBuffer() method. For instance, an implementation could
+         * decide to add the buffer to the list and return a new buffer
+         * next time getBuffer() is called, or could decide to add a buffer
+         * slice to the list and return the same buffer (if remaining
+         * space is available) next time getBuffer() is called.
+         *
+         * @param list    The list before adding the data. Can be null.
+         * @param buffer  The buffer containing the data to add to the list.
+         * @param start   The start position at which data were read.
+         *                The current buffer position indicates the end.
+         * @return A possibly new list where a buffer containing the
+         *         data read from the socket has been added.
+         */
+        List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+    }
+
+    // An implementation of BufferSource used for unencrypted data.
+    // This buffer source uses heap buffers and avoids wasting memory
+    // by forwarding read only buffer slices downstream.
+    // Buffers allocated through this source are simply GC'ed when
+    // they are no longer referenced.
+    static final class SliceBufferSource implements BufferSource {
+        private final Supplier<ByteBuffer> factory;
+        private volatile ByteBuffer current;
+        public SliceBufferSource() {
+            this(Utils::getBuffer);
+        }
+        public SliceBufferSource(Supplier<ByteBuffer> factory) {
+            this.factory = Objects.requireNonNull(factory);
+        }
+
+        // reuse the same buffer if some space remains available.
+        // otherwise, returns a new heap buffer.
+        @Override
+        public final ByteBuffer getBuffer() {
+            ByteBuffer buf = current;
+            buf = (buf == null || !buf.hasRemaining())
+                    ? (current = factory.get()) : buf;
+            assert buf.hasRemaining();
+            return buf;
+        }
+
+        // Adds a read only slice to the list, potentially returning a
+        // new list with with that slice at the end.
+        @Override
+        public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+            // creates a slice to add to the list
+            int limit = buf.limit();
+            buf.limit(buf.position());
+            buf.position(start);
+            ByteBuffer slice = buf.slice();
+
+            // restore buffer state to what it was before creating the slice
+            buf.position(buf.limit());
+            buf.limit(limit);
+
+            // add the buffer to the list
+            return SocketTube.listOf(list, slice.asReadOnlyBuffer());
+        }
+    }
+
+
+    // An implementation of BufferSource used for encrypted data.
+    // This buffer source use direct byte buffers that will be
+    // recycled by the SocketTube subscriber.
+    //
+    static final class SSLDirectBufferSource implements BufferSource {
+        private final Supplier<ByteBuffer> factory;
+        private final HttpClientImpl client;
+        private volatile ByteBuffer current;
+
+        public SSLDirectBufferSource(HttpClientImpl client) {
+            this.client = Objects.requireNonNull(client);
+            this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
+        }
+
+        // Obtain a 'free' byte buffer from the pool, or return
+        // the same buffer if nothing was read at the previous cycle.
+        // The subscriber will be responsible for recycling this
+        // buffer into the pool (see SSLFlowDelegate.Reader)
+        @Override
+        public final ByteBuffer getBuffer() {
+            assert client.isSelectorThread();
+            ByteBuffer buf = current;
+            if (buf == null) {
+                buf = current = factory.get();
+            }
+            assert buf.hasRemaining();
+            assert buf.position() == 0;
+            return buf;
+        }
+
+        // Adds the buffer to the list. The buffer will be later returned to the
+        // pool by the subscriber (see SSLFlowDelegate.Reader).
+        // The next buffer returned by getBuffer() will be obtained from the
+        // pool. It might be the same buffer or another one.
+        // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
+        // recycling will happen in the flow before onNext returns, then the
+        // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
+        // it's shared by all SSL connections opened on that client.
+        @Override
+        public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+            assert client.isSelectorThread();
+            assert buf.isDirect();
+            assert start == 0;
+            assert current == buf;
+            current = null;
+            buf.limit(buf.position());
+            buf.position(start);
+            // add the buffer to the list
+            return SocketTube.listOf(list, buf);
+        }
+    }
+
+    // ===================================================================== //
     //                   Socket Channel Read/Write                           //
     // ===================================================================== //
     static final int MAX_BUFFERS = 3;
@@ -918,11 +1064,8 @@
     // is inserted into the returned buffer list, and if the current buffer
     // has remaining space, that space will be used to read more data when
     // the channel becomes readable again.
-    private volatile ByteBuffer current;
-    private List<ByteBuffer> readAvailable() throws IOException {
-        ByteBuffer buf = current;
-        buf = (buf == null || !buf.hasRemaining())
-                ? (current = buffersSource.get()) : buf;
+    private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
+        ByteBuffer buf = buffersSource.getBuffer();
         assert buf.hasRemaining();
 
         int read;
@@ -961,31 +1104,20 @@
             // check whether this buffer has still some free space available.
             // if so, we will keep it for the next round.
             final boolean hasRemaining = buf.hasRemaining();
-
-            // creates a slice to add to the list
-            int limit = buf.limit();
-            buf.limit(buf.position());
-            buf.position(pos);
-            ByteBuffer slice = buf.slice();
+            list = buffersSource.append(list, buf, pos);
 
-            // restore buffer state to what it was before creating the slice
-            buf.position(buf.limit());
-            buf.limit(limit);
-
-            // add the buffer to the list
-            list = addToList(list, slice.asReadOnlyBuffer());
             if (read <= 0 || list.size() == MAX_BUFFERS) {
                 break;
             }
 
-            buf = hasRemaining ? buf : (current = buffersSource.get());
+            buf = buffersSource.getBuffer();
             pos = buf.position();
             assert buf.hasRemaining();
         }
         return list;
     }
 
-    private <T> List<T> addToList(List<T> list, T item) {
+    private static <T> List<T> listOf(List<T> list, T item) {
         int size = list == null ? 0 : list.size();
         switch (size) {
             case 0: return List.of(item);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/BufferSupplier.java	Mon Apr 23 15:45:40 2018 +0100
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.internal.net.http.common;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+
+/**
+ *  This interface allows to recycle buffers used for SSL decryption.
+ *  Buffers that are used for reading SSL encrypted data are typically
+ *  very short lived, as it is necessary to aggregate their content
+ *  before calling SSLEngine::unwrap.
+ *  Because both reading and copying happen in the SelectorManager
+ *  thread, then it makes it possible to pool these buffers and
+ *  recycle them for the next socket read, instead of simply
+ *  letting them be GC'ed. That also makes it possible to use
+ *  direct byte buffers, and avoid another layer of copying by
+ *  the SocketChannel implementation.
+ *
+ *  The HttpClientImpl has an implementation of this interface
+ *  that allows to reuse the same 3 direct buffers for reading
+ *  off SSL encrypted data from the socket.
+ *  The BufferSupplier::get method is called by SocketTube
+ *  (see SocketTube.SSLDirectBufferSource) and BufferSupplier::recycle
+ *  is called by SSLFlowDelegate.Reader.
+ **/
+public interface BufferSupplier extends Supplier<ByteBuffer> {
+    /**
+     * Returns a buffer to read encrypted data off the socket.
+     * @return a buffer to read encrypted data off the socket.
+     */
+    ByteBuffer get();
+
+    /**
+     * Returns a buffer to the pool.
+     *
+     * @param buffer This must be a buffer previously obtained
+     *               by calling BufferSupplier::get. The caller must
+     *               not touch the buffer after returning it to
+     *               the pool.
+     */
+    void recycle(ByteBuffer buffer);
+}
+
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferPool.java	Thu Apr 19 16:47:52 2018 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.internal.net.http.common;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * The class provides reuse of ByteBuffers.
- * It is supposed that all requested buffers have the same size for a long period of time.
- * That is why there is no any logic splitting buffers into different buckets (by size). It's unnecessary.
- *
- * At the same moment it is allowed to change requested buffers size (all smaller buffers will be discarded).
- * It may be needed for example, if after rehandshaking netPacketBufferSize was changed.
- */
-public class ByteBufferPool {
-
-    private final java.util.Queue<ByteBuffer> pool = new ConcurrentLinkedQueue<>();
-
-    public ByteBufferPool() {
-    }
-
-    public ByteBufferReference get(int size) {
-        ByteBuffer buffer;
-        while ((buffer = pool.poll()) != null) {
-            if (buffer.capacity() >= size) {
-                return ByteBufferReference.of(buffer, this);
-            }
-        }
-        return ByteBufferReference.of(ByteBuffer.allocate(size), this);
-    }
-
-    public void release(ByteBuffer buffer) {
-        buffer.clear();
-        pool.offer(buffer);
-    }
-
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferReference.java	Thu Apr 19 16:47:52 2018 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,88 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.internal.net.http.common;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
-import java.util.function.Supplier;
-
-public class ByteBufferReference  implements Supplier<ByteBuffer> {
-
-    private ByteBuffer buffer;
-    private final ByteBufferPool pool;
-
-    public static ByteBufferReference of(ByteBuffer buffer) {
-        return of(buffer, null);
-    }
-
-    public static ByteBufferReference of(ByteBuffer buffer, ByteBufferPool pool) {
-        Objects.requireNonNull(buffer);
-        return new ByteBufferReference(buffer, pool);
-    }
-
-    public static ByteBuffer[] toBuffers(ByteBufferReference... refs) {
-        ByteBuffer[] bufs = new ByteBuffer[refs.length];
-        for (int i = 0; i < refs.length; i++) {
-            bufs[i] = refs[i].get();
-        }
-        return bufs;
-    }
-
-    public static ByteBufferReference[] toReferences(ByteBuffer... buffers) {
-        ByteBufferReference[] refs = new ByteBufferReference[buffers.length];
-        for (int i = 0; i < buffers.length; i++) {
-            refs[i] = of(buffers[i]);
-        }
-        return refs;
-    }
-
-
-    public static void clear(ByteBufferReference[] refs) {
-        for(ByteBufferReference ref : refs) {
-            ref.clear();
-        }
-    }
-
-    private ByteBufferReference(ByteBuffer buffer, ByteBufferPool pool) {
-        this.buffer = buffer;
-        this.pool = pool;
-    }
-
-    @Override
-    public ByteBuffer get() {
-        ByteBuffer buf = this.buffer;
-        assert buf!=null : "getting ByteBuffer after clearance";
-        return buf;
-    }
-
-    public void clear() {
-        ByteBuffer buf = this.buffer;
-        assert buf!=null : "double ByteBuffer clearance";
-        this.buffer = null;
-        if (pool != null) {
-            pool.release(buf);
-        }
-    }
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java	Mon Apr 23 15:45:40 2018 +0100
@@ -63,6 +63,8 @@
          */
         default void dropSubscription() { }
 
+        default boolean supportsRecycling() { return false; }
+
     }
 
     /**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Mon Apr 23 15:45:40 2018 +0100
@@ -45,6 +45,7 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -96,6 +97,7 @@
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
+    final Consumer<ByteBuffer> recycler;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
 
@@ -109,8 +111,23 @@
                            Subscriber<? super List<ByteBuffer>> downReader,
                            Subscriber<? super List<ByteBuffer>> downWriter)
     {
+        this(engine, exec, null, downReader, downWriter);
+    }
+
+    /**
+     * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+     * Flow.Subscriber requires an associated {@link CompletableFuture}
+     * for errors that need to be signaled from downstream to upstream.
+     */
+    public SSLFlowDelegate(SSLEngine engine,
+            Executor exec,
+            Consumer<ByteBuffer> recycler,
+            Subscriber<? super List<ByteBuffer>> downReader,
+            Subscriber<? super List<ByteBuffer>> downWriter)
+        {
         this.id = scount.getAndIncrement();
         this.tubeName = String.valueOf(downWriter);
+        this.recycler = recycler;
         this.reader = new Reader();
         this.writer = new Writer();
         this.engine = engine;
@@ -212,7 +229,7 @@
      * Upstream subscription strategy is to try and keep no more than
      * TARGET_BUFSIZE bytes in readBuf
      */
-    class Reader extends SubscriberWrapper {
+    class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
         // Maximum record size is 16k.
         // Because SocketTube can feeds us up to 3 16K buffers,
         // then setting this size to 16K means that the readBuf
@@ -237,6 +254,11 @@
             readBuf.limit(0); // keep in read mode
         }
 
+        @Override
+        public boolean supportsRecycling() {
+            return recycler != null;
+        }
+
         protected SchedulingAction enterScheduling() {
             return enterReadScheduling();
         }
@@ -292,6 +314,11 @@
                         reallocReadBuf();
                     readBuf.put(buf);
                     readBuf.flip();
+                    // should be safe to call inside lock
+                    // since the only implementation
+                    // offers the buffer to an unbounded queue.
+                    // WARNING: do not touch buf after this point!
+                    if (recycler != null) recycler.accept(buf);
                 }
                 if (complete) {
                     this.completing = complete;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Mon Apr 23 15:45:40 2018 +0100
@@ -77,6 +77,13 @@
     private volatile boolean finished;
 
     public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
+        this(engine, executor, null, tube);
+    }
+
+    public SSLTube(SSLEngine engine,
+                   Executor executor,
+                   Consumer<ByteBuffer> recycler,
+                   FlowTube tube) {
         Objects.requireNonNull(engine);
         Objects.requireNonNull(executor);
         this.tube = Objects.requireNonNull(tube);
@@ -85,15 +92,17 @@
         this.engine = engine;
         sslDelegate = new SSLTubeFlowDelegate(engine,
                                               executor,
+                                              recycler,
                                               readSubscriber,
                                               tube);
     }
 
     final class SSLTubeFlowDelegate extends SSLFlowDelegate {
         SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+                            Consumer<ByteBuffer> recycler,
                             SSLSubscriberWrapper readSubscriber,
                             FlowTube tube) {
-            super(engine, executor, readSubscriber, tube);
+            super(engine, executor, recycler, readSubscriber, tube);
         }
         protected SchedulingAction enterReadScheduling() {
             readSubscriber.processPendingSubscriber();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Mon Apr 23 15:45:40 2018 +0100
@@ -581,25 +581,6 @@
         return (int) remain;
     }
 
-    public static long remaining(ByteBufferReference[] refs) {
-        long remain = 0;
-        for (ByteBufferReference ref : refs) {
-            remain += ref.get().remaining();
-        }
-        return remain;
-    }
-
-    public static int remaining(ByteBufferReference[] refs, int max) {
-        long remain = 0;
-        for (ByteBufferReference ref : refs) {
-            remain += ref.get().remaining();
-            if (remain > max) {
-                throw new IllegalArgumentException("too many bytes");
-            }
-        }
-        return (int) remain;
-    }
-
     public static int remaining(ByteBuffer[] refs, int max) {
         long remain = 0;
         for (ByteBuffer b : refs) {
@@ -623,7 +604,6 @@
     public static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0);
     public static final ByteBuffer[] EMPTY_BB_ARRAY = new ByteBuffer[0];
     public static final List<ByteBuffer> EMPTY_BB_LIST = List.of();
-    public static final ByteBufferReference[] EMPTY_BBR_ARRAY = new ByteBufferReference[0];
 
     /**
      * Returns a slice of size {@code amount} from the given buffer. If the
--- a/test/jdk/java/net/httpclient/ProxyServer.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/test/jdk/java/net/httpclient/ProxyServer.java	Mon Apr 23 15:45:40 2018 +0100
@@ -73,12 +73,10 @@
      */
     public void close() throws IOException {
         if (debug) System.out.println("Proxy: closing");
-            done = true;
+        done = true;
         listener.close();
         for (Connection c : connections) {
-            if (c.running()) {
-                c.close();
-            }
+            c.close();
         }
     }
 
@@ -179,7 +177,9 @@
             return out.isAlive() || in.isAlive();
         }
 
-        public void close() throws IOException {
+        private volatile boolean closing;
+        public synchronized void close() throws IOException {
+            closing = true;
             if (debug) System.out.println("Closing connection (proxy)");
             if (serverSocket != null) serverSocket.close();
             if (clientSocket != null) clientSocket.close();
@@ -238,7 +238,13 @@
                 i++;
 
                 commonInit(dest, 80);
-                serverOut.write(buf, i, buf.length-i);
+                OutputStream sout;
+                synchronized (this) {
+                    if (closing) return;
+                    sout = serverOut;
+                }
+                // might fail if we're closing but we don't care.
+                sout.write(buf, i, buf.length-i);
                 proxyCommon();
 
             } catch (URISyntaxException e) {
@@ -246,7 +252,8 @@
             }
         }
 
-        void commonInit(String dest, int defaultPort) throws IOException {
+        synchronized void commonInit(String dest, int defaultPort) throws IOException {
+            if (closing) return;
             int port;
             String[] hostport = dest.split(":");
             if (hostport.length == 1) {
@@ -261,7 +268,8 @@
             serverIn = new BufferedInputStream(serverSocket.getInputStream());
         }
 
-        void proxyCommon() throws IOException {
+        synchronized void proxyCommon() throws IOException {
+            if (closing) return;
             out = new Thread(() -> {
                 try {
                     byte[] bb = new byte[8000];
@@ -269,6 +277,7 @@
                     while ((n = clientIn.read(bb)) != -1) {
                         serverOut.write(bb, 0, n);
                     }
+                    closing = true;
                     serverSocket.close();
                     clientSocket.close();
                 } catch (IOException e) {
@@ -284,6 +293,7 @@
                     while ((n = serverIn.read(bb)) != -1) {
                         clientOut.write(bb, 0, n);
                     }
+                    closing = true;
                     serverSocket.close();
                     clientSocket.close();
                 } catch (IOException e) {
@@ -302,7 +312,9 @@
         }
 
         void doTunnel(String dest) throws IOException {
+            if (closing) return; // no need to go further.
             commonInit(dest, 443);
+            // might fail if we're closing, but we don't care.
             clientOut.write("HTTP/1.1 200 OK\r\n\r\n".getBytes());
             proxyCommon();
         }
--- a/test/jdk/java/net/httpclient/SmallTimeout.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/test/jdk/java/net/httpclient/SmallTimeout.java	Mon Apr 23 15:45:40 2018 +0100
@@ -85,12 +85,14 @@
             ss.setReuseAddress(false);
             ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
             int port = ss.getLocalPort();
-            URI uri = new URI("http://localhost:" + port + "/");
+            URI u = new URI("http://localhost:" + port + "/");
 
             HttpRequest[] requests = new HttpRequest[TIMEOUTS.length];
 
             out.println("--- TESTING Async");
             for (int i = 0; i < TIMEOUTS.length; i++) {
+                final int n = i;
+                URI uri = new URI(u.toString() + "/r" + n);
                 requests[i] = HttpRequest.newBuilder(uri)
                                          .timeout(Duration.ofMillis(TIMEOUTS[i]))
                                          .GET()
@@ -102,24 +104,25 @@
                     .whenComplete((HttpResponse<Object> r, Throwable t) -> {
                         Throwable cause = null;
                         if (r != null) {
-                            out.println("Unexpected response: " + r);
-                            cause = new RuntimeException("Unexpected response");
+                            out.println("Unexpected response for r" + n + ": " + r);
+                            cause = new RuntimeException("Unexpected response for r" + n);
                             error = true;
                         }
                         if (t != null) {
                             if (!(t.getCause() instanceof HttpTimeoutException)) {
-                                out.println("Wrong exception type:" + t.toString());
+                                out.println("Wrong exception type for r" + n + ":" + t.toString());
                                 Throwable c = t.getCause() == null ? t : t.getCause();
                                 c.printStackTrace();
                                 cause = c;
                                 error = true;
                             } else {
-                                out.println("Caught expected timeout: " + t.getCause());
+                                out.println("Caught expected timeout for r" + n +": " + t.getCause());
                             }
                         }
                         if (t == null && r == null) {
-                            out.println("Both response and throwable are null!");
-                            cause = new RuntimeException("Both response and throwable are null!");
+                            out.println("Both response and throwable are null for r" + n + "!");
+                            cause = new RuntimeException("Both response and throwable are null for r"
+                                    + n + "!");
                             error = true;
                         }
                         queue.add(HttpResult.of(req,cause));
@@ -134,11 +137,14 @@
 
             // Repeat blocking in separate threads. Use queue to wait.
             out.println("--- TESTING Sync");
+            System.err.println("================= TESTING Sync =====================");
 
             // For running blocking response tasks
             ExecutorService executor = Executors.newCachedThreadPool();
 
             for (int i = 0; i < TIMEOUTS.length; i++) {
+                final int n = i;
+                URI uri = new URI(u.toString()+"/sync/r" + n);
                 requests[i] = HttpRequest.newBuilder(uri)
                                          .timeout(Duration.ofMillis(TIMEOUTS[i]))
                                          .GET()
@@ -148,11 +154,13 @@
                 executor.execute(() -> {
                     Throwable cause = null;
                     try {
-                        client.send(req, BodyHandlers.replacing(null));
+                        HttpResponse<?> r = client.send(req, BodyHandlers.replacing(null));
+                        out.println("Unexpected success for r" + n +": " + r);
                     } catch (HttpTimeoutException e) {
-                        out.println("Caught expected timeout: " + e);
+                        out.println("Caught expected timeout for r" + n +": " + e);
                     } catch (Throwable ee) {
                         Throwable c = ee.getCause() == null ? ee : ee.getCause();
+                        out.println("Unexpected exception for r" + n + ": " + c);
                         c.printStackTrace();
                         cause = c;
                         error = true;