# HG changeset patch # User dfuchs # Date 1524494740 -3600 # Node ID fe2bf7b369b84b6cbda185adc95b9efb53a17a21 # Parent b583caf69b3960b7c7d4f8c484de44539fecb2ec http-client-branch: use direct buffer pool for reading off SSL encrypted buffers from the socket + minor test fixes. diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Mon Apr 23 15:45:40 2018 +0100 @@ -61,6 +61,7 @@ // create the SSLTube wrapping the SocketTube, with the given engine flow = new SSLTube(engine, client().theExecutor(), + client().getSSLBufferSupplier()::recycle, plainConnection.getConnectionFlow()); return null; } ); } diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Mon Apr 23 15:45:40 2018 +0100 @@ -65,6 +65,7 @@ // create the SSLTube wrapping the SocketTube, with the given engine flow = new SSLTube(engine, client().theExecutor(), + client().getSSLBufferSupplier()::recycle, plainConnection.getConnectionFlow()); return null;} ); } diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Mon Apr 23 15:45:40 2018 +0100 @@ -131,22 +131,38 @@ * in case of SSL connection. * * 1. Outgoing frames encoded to ByteBuffers. - * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) + * Outgoing ByteBuffers are created with required size and frequently small (except DataFrames, etc) * At this place no pools at all. All outgoing buffers should be collected by GC. * * 2. Incoming ByteBuffers (decoded to frames). * Here, total elimination of BB pool is not a good idea. * We don't know how many bytes we will receive through network. - * So here we allocate buffer of reasonable size. The following life of the BB: - * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) - * BB is returned to pool, - * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. - * Such BB is never returned to pool and will be GCed. - * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and - * the buffer could be release to pool. + * + * So here is a strategy we could try to implement: + * We allocate buffer of reasonable size. The following life of the BB: + * - If all frames decoded from the BB are other than DataFrame andHeaderFrame + * (and HeaderFrame subclasses) BB is returned to pool, + * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer + * obtained by slice() method. Such BB is never returned to pool and will be GCed. + * - If we decoded HeadersFrame from the BB. Then header decoding is performed + * inside processFrame method and the buffer could be release to pool. * - * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, - * because of we can't predict size encrypted packets. + * At this moment we do not implement this strategy. + * Instead we only use a pool for recycling SSL encrypted buffers read from + * the socket (see 3). + * + * 3. SSL encrypted buffers. Here another pool was introduced and all net buffers are to/from + * the pool, because of we can't predict size encrypted packets. + * + * At the moment we only recycle encrypted buffers read from the socket, and we have + * a pool of maximum 3 (SocketTube.MAX_BUFFERS = 3) direct buffers which are shared by + * all connections on a given client. + * This pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2, but only + * for SSL encrypted buffers that circulate between the SocketTube publisher and + * the SSLFlowDelegate Reader. Limiting the pool to this particular segment allows + * us to use direct buffer and avoid one more copy. + * See HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource, and + * SSLTube.recycler. * */ diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Mon Apr 23 15:45:40 2018 +0100 @@ -28,12 +28,12 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; import java.io.IOException; -import java.lang.System.Logger.Level; import java.lang.ref.Reference; import java.lang.ref.WeakReference; import java.net.Authenticator; import java.net.CookieHandler; import java.net.ProxySelector; +import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; @@ -47,7 +47,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -57,6 +56,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -70,6 +70,7 @@ import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.PushPromiseHandler; import java.net.http.WebSocket; +import jdk.internal.net.http.common.BufferSupplier; import jdk.internal.net.http.common.Log; import jdk.internal.net.http.common.Logger; import jdk.internal.net.http.common.Pair; @@ -138,6 +139,13 @@ private final long id; private final String dbgTag; + // The SSL DirectBuffer Supplier provides the ability to recycle + // buffers used between the socket reader and the SSLEngine, or + // more precisely between the SocketTube publisher and the + // SSLFlowDelegate reader. + private final SSLDirectBufferSupplier sslBufferSupplier + = new SSLDirectBufferSupplier(this); + // This reference is used to keep track of the facade HttpClient // that was returned to the application code. // It makes it possible to know when the application no longer @@ -1164,4 +1172,70 @@ 0 // only set the size if > 0 ); } + + // Optimization for reading SSL encrypted data + // -------------------------------------------- + + // Returns a BufferSupplier that can be used for reading + // encrypted bytes of the socket. These buffers can then + // be recycled by the SSLFlowDelegate::Reader after their + // content has been copied in the SSLFlowDelegate::Reader + // readBuf. + // Because allocating, reading, copying, and recycling + // all happen in the SelectorManager thread, + // then this BufferSupplier can be shared between all + // the SSL connections managed by this client. + BufferSupplier getSSLBufferSupplier() { + return sslBufferSupplier; + } + + // An implementation of BufferSupplier that manage a pool of + // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that + // are used for reading encrypted bytes off the socket before + // copying before unwrapping. + private static final class SSLDirectBufferSupplier implements BufferSupplier { + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final HttpClientImpl client; + private final Logger debug; + public SSLDirectBufferSupplier(HttpClientImpl client) { + this.client = Objects.requireNonNull(client); + this.debug = client.debug; + } + + // get a buffer from the pool, or allocate a new one if needed. + @Override + public ByteBuffer get() { + assert client.isSelectorThread(); + ByteBuffer buf = queue.poll(); + if (buf == null) { + if (debug.on()) { + // should not appear more than SocketTube.MAX_BUFFERS + debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE); + } + buf = ByteBuffer.allocateDirect(Utils.BUFSIZE); + } else { + // if (debug.on()) { // this trace is mostly noise. + // debug.log("ByteBuffer.recycle(%d)", buf.remaining()); + // } + } + assert buf.isDirect(); + assert buf.position() == 0; + assert buf.hasRemaining(); + assert buf.limit() == Utils.BUFSIZE; + return buf; + } + + // return the buffer to the pool + @Override + public void recycle(ByteBuffer buffer) { + assert client.isSelectorThread(); + assert buffer.isDirect(); + assert !buffer.hasRemaining(); + buffer.position(0); + buffer.limit(buffer.capacity()); + queue.offer(buffer); + assert queue.size() <= SocketTube.MAX_BUFFERS; + } + } + } diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 23 15:45:40 2018 +0100 @@ -26,7 +26,6 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -39,6 +38,7 @@ import java.util.ArrayList; import java.util.function.Consumer; import java.util.function.Supplier; +import jdk.internal.net.http.common.BufferSupplier; import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.Logger; @@ -59,7 +59,7 @@ private final HttpClientImpl client; private final SocketChannel channel; - private final Supplier buffersSource; + private final SliceBufferSource sliceBuffersSource; private final Object lock = new Object(); private final AtomicReference errorRef = new AtomicReference<>(); private final InternalReadPublisher readPublisher; @@ -67,10 +67,11 @@ private final long id = IDS.incrementAndGet(); public SocketTube(HttpClientImpl client, SocketChannel channel, - Supplier buffersSource) { + Supplier buffersFactory) { this.client = client; this.channel = channel; - this.buffersSource = buffersSource; + this.sliceBuffersSource = new SliceBufferSource(buffersFactory); + this.readPublisher = new InternalReadPublisher(); this.writeSubscriber = new InternalWriteSubscriber(); } @@ -564,6 +565,7 @@ final InternalReadSubscription impl; final TubeSubscriber subscriber; final AtomicReference errorRef = new AtomicReference<>(); + final BufferSource bufferSource; volatile boolean subscribed; volatile boolean cancelled; volatile boolean completed; @@ -571,6 +573,9 @@ public ReadSubscription(InternalReadSubscription impl, TubeSubscriber subscriber) { this.impl = impl; + this.bufferSource = subscriber.supportsRecycling() + ? new SSLDirectBufferSource(client) + : SocketTube.this.sliceBuffersSource; this.subscriber = subscriber; } @@ -779,7 +784,7 @@ if (demand.tryDecrement()) { // we have demand. try { - List bytes = readAvailable(); + List bytes = readAvailable(subscription.bufferSource); if (bytes == EOF) { if (!completed) { if (debug.on()) debug.log("got read EOF"); @@ -906,6 +911,147 @@ } // ===================================================================== // + // Buffer Management // + // ===================================================================== // + + // This interface is used by readAvailable(BufferSource); + public interface BufferSource { + /** + * Returns a buffer to read data from the socket. + * Different implementation can have different strategies, as to + * which kind of buffer to return, or whether to return the same + * buffer. The only constraints are that + * a. the buffer returned must not be null + * b. the buffer position indicates where to start reading + * c. the buffer limit indicates where to stop reading. + * d. the buffer is 'free' - that is - it is not used + * or retained by anybody else + * @return A buffer to read data from the socket. + */ + ByteBuffer getBuffer(); + + /** + * Append the data read into the buffer to the list of buffer to + * be sent downstream to the subscriber. May return a new + * list, or append to the given list. + * + * Different implementation can have different strategies, but + * must obviously be consistent with the implementation of the + * getBuffer() method. For instance, an implementation could + * decide to add the buffer to the list and return a new buffer + * next time getBuffer() is called, or could decide to add a buffer + * slice to the list and return the same buffer (if remaining + * space is available) next time getBuffer() is called. + * + * @param list The list before adding the data. Can be null. + * @param buffer The buffer containing the data to add to the list. + * @param start The start position at which data were read. + * The current buffer position indicates the end. + * @return A possibly new list where a buffer containing the + * data read from the socket has been added. + */ + List append(List list, ByteBuffer buffer, int start); + } + + // An implementation of BufferSource used for unencrypted data. + // This buffer source uses heap buffers and avoids wasting memory + // by forwarding read only buffer slices downstream. + // Buffers allocated through this source are simply GC'ed when + // they are no longer referenced. + static final class SliceBufferSource implements BufferSource { + private final Supplier factory; + private volatile ByteBuffer current; + public SliceBufferSource() { + this(Utils::getBuffer); + } + public SliceBufferSource(Supplier factory) { + this.factory = Objects.requireNonNull(factory); + } + + // reuse the same buffer if some space remains available. + // otherwise, returns a new heap buffer. + @Override + public final ByteBuffer getBuffer() { + ByteBuffer buf = current; + buf = (buf == null || !buf.hasRemaining()) + ? (current = factory.get()) : buf; + assert buf.hasRemaining(); + return buf; + } + + // Adds a read only slice to the list, potentially returning a + // new list with with that slice at the end. + @Override + public final List append(List list, ByteBuffer buf, int start) { + // creates a slice to add to the list + int limit = buf.limit(); + buf.limit(buf.position()); + buf.position(start); + ByteBuffer slice = buf.slice(); + + // restore buffer state to what it was before creating the slice + buf.position(buf.limit()); + buf.limit(limit); + + // add the buffer to the list + return SocketTube.listOf(list, slice.asReadOnlyBuffer()); + } + } + + + // An implementation of BufferSource used for encrypted data. + // This buffer source use direct byte buffers that will be + // recycled by the SocketTube subscriber. + // + static final class SSLDirectBufferSource implements BufferSource { + private final Supplier factory; + private final HttpClientImpl client; + private volatile ByteBuffer current; + + public SSLDirectBufferSource(HttpClientImpl client) { + this.client = Objects.requireNonNull(client); + this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); + } + + // Obtain a 'free' byte buffer from the pool, or return + // the same buffer if nothing was read at the previous cycle. + // The subscriber will be responsible for recycling this + // buffer into the pool (see SSLFlowDelegate.Reader) + @Override + public final ByteBuffer getBuffer() { + assert client.isSelectorThread(); + ByteBuffer buf = current; + if (buf == null) { + buf = current = factory.get(); + } + assert buf.hasRemaining(); + assert buf.position() == 0; + return buf; + } + + // Adds the buffer to the list. The buffer will be later returned to the + // pool by the subscriber (see SSLFlowDelegate.Reader). + // The next buffer returned by getBuffer() will be obtained from the + // pool. It might be the same buffer or another one. + // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because + // recycling will happen in the flow before onNext returns, then the + // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though + // it's shared by all SSL connections opened on that client. + @Override + public final List append(List list, ByteBuffer buf, int start) { + assert client.isSelectorThread(); + assert buf.isDirect(); + assert start == 0; + assert current == buf; + current = null; + buf.limit(buf.position()); + buf.position(start); + // add the buffer to the list + return SocketTube.listOf(list, buf); + } + } + + // ===================================================================== // // Socket Channel Read/Write // // ===================================================================== // static final int MAX_BUFFERS = 3; @@ -918,11 +1064,8 @@ // is inserted into the returned buffer list, and if the current buffer // has remaining space, that space will be used to read more data when // the channel becomes readable again. - private volatile ByteBuffer current; - private List readAvailable() throws IOException { - ByteBuffer buf = current; - buf = (buf == null || !buf.hasRemaining()) - ? (current = buffersSource.get()) : buf; + private List readAvailable(BufferSource buffersSource) throws IOException { + ByteBuffer buf = buffersSource.getBuffer(); assert buf.hasRemaining(); int read; @@ -961,31 +1104,20 @@ // check whether this buffer has still some free space available. // if so, we will keep it for the next round. final boolean hasRemaining = buf.hasRemaining(); - - // creates a slice to add to the list - int limit = buf.limit(); - buf.limit(buf.position()); - buf.position(pos); - ByteBuffer slice = buf.slice(); + list = buffersSource.append(list, buf, pos); - // restore buffer state to what it was before creating the slice - buf.position(buf.limit()); - buf.limit(limit); - - // add the buffer to the list - list = addToList(list, slice.asReadOnlyBuffer()); if (read <= 0 || list.size() == MAX_BUFFERS) { break; } - buf = hasRemaining ? buf : (current = buffersSource.get()); + buf = buffersSource.getBuffer(); pos = buf.position(); assert buf.hasRemaining(); } return list; } - private List addToList(List list, T item) { + private static List listOf(List list, T item) { int size = list == null ? 0 : list.size(); switch (size) { case 0: return List.of(item); diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/BufferSupplier.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/BufferSupplier.java Mon Apr 23 15:45:40 2018 +0100 @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package jdk.internal.net.http.common; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; + +/** + * This interface allows to recycle buffers used for SSL decryption. + * Buffers that are used for reading SSL encrypted data are typically + * very short lived, as it is necessary to aggregate their content + * before calling SSLEngine::unwrap. + * Because both reading and copying happen in the SelectorManager + * thread, then it makes it possible to pool these buffers and + * recycle them for the next socket read, instead of simply + * letting them be GC'ed. That also makes it possible to use + * direct byte buffers, and avoid another layer of copying by + * the SocketChannel implementation. + * + * The HttpClientImpl has an implementation of this interface + * that allows to reuse the same 3 direct buffers for reading + * off SSL encrypted data from the socket. + * The BufferSupplier::get method is called by SocketTube + * (see SocketTube.SSLDirectBufferSource) and BufferSupplier::recycle + * is called by SSLFlowDelegate.Reader. + **/ +public interface BufferSupplier extends Supplier { + /** + * Returns a buffer to read encrypted data off the socket. + * @return a buffer to read encrypted data off the socket. + */ + ByteBuffer get(); + + /** + * Returns a buffer to the pool. + * + * @param buffer This must be a buffer previously obtained + * by calling BufferSupplier::get. The caller must + * not touch the buffer after returning it to + * the pool. + */ + void recycle(ByteBuffer buffer); +} + diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferPool.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferPool.java Thu Apr 19 16:47:52 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.internal.net.http.common; - -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * The class provides reuse of ByteBuffers. - * It is supposed that all requested buffers have the same size for a long period of time. - * That is why there is no any logic splitting buffers into different buckets (by size). It's unnecessary. - * - * At the same moment it is allowed to change requested buffers size (all smaller buffers will be discarded). - * It may be needed for example, if after rehandshaking netPacketBufferSize was changed. - */ -public class ByteBufferPool { - - private final java.util.Queue pool = new ConcurrentLinkedQueue<>(); - - public ByteBufferPool() { - } - - public ByteBufferReference get(int size) { - ByteBuffer buffer; - while ((buffer = pool.poll()) != null) { - if (buffer.capacity() >= size) { - return ByteBufferReference.of(buffer, this); - } - } - return ByteBufferReference.of(ByteBuffer.allocate(size), this); - } - - public void release(ByteBuffer buffer) { - buffer.clear(); - pool.offer(buffer); - } - -} diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferReference.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferReference.java Thu Apr 19 16:47:52 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.internal.net.http.common; - -import java.nio.ByteBuffer; -import java.util.Objects; -import java.util.function.Supplier; - -public class ByteBufferReference implements Supplier { - - private ByteBuffer buffer; - private final ByteBufferPool pool; - - public static ByteBufferReference of(ByteBuffer buffer) { - return of(buffer, null); - } - - public static ByteBufferReference of(ByteBuffer buffer, ByteBufferPool pool) { - Objects.requireNonNull(buffer); - return new ByteBufferReference(buffer, pool); - } - - public static ByteBuffer[] toBuffers(ByteBufferReference... refs) { - ByteBuffer[] bufs = new ByteBuffer[refs.length]; - for (int i = 0; i < refs.length; i++) { - bufs[i] = refs[i].get(); - } - return bufs; - } - - public static ByteBufferReference[] toReferences(ByteBuffer... buffers) { - ByteBufferReference[] refs = new ByteBufferReference[buffers.length]; - for (int i = 0; i < buffers.length; i++) { - refs[i] = of(buffers[i]); - } - return refs; - } - - - public static void clear(ByteBufferReference[] refs) { - for(ByteBufferReference ref : refs) { - ref.clear(); - } - } - - private ByteBufferReference(ByteBuffer buffer, ByteBufferPool pool) { - this.buffer = buffer; - this.pool = pool; - } - - @Override - public ByteBuffer get() { - ByteBuffer buf = this.buffer; - assert buf!=null : "getting ByteBuffer after clearance"; - return buf; - } - - public void clear() { - ByteBuffer buf = this.buffer; - assert buf!=null : "double ByteBuffer clearance"; - this.buffer = null; - if (pool != null) { - pool.release(buf); - } - } -} diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java Mon Apr 23 15:45:40 2018 +0100 @@ -63,6 +63,8 @@ */ default void dropSubscription() { } + default boolean supportsRecycling() { return false; } + } /** diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Mon Apr 23 15:45:40 2018 +0100 @@ -45,6 +45,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * Implements SSL using two SubscriberWrappers. @@ -96,6 +97,7 @@ volatile boolean close_notify_received; final CompletableFuture readerCF; final CompletableFuture writerCF; + final Consumer recycler; static AtomicInteger scount = new AtomicInteger(1); final int id; @@ -109,8 +111,23 @@ Subscriber> downReader, Subscriber> downWriter) { + this(engine, exec, null, downReader, downWriter); + } + + /** + * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each + * Flow.Subscriber requires an associated {@link CompletableFuture} + * for errors that need to be signaled from downstream to upstream. + */ + public SSLFlowDelegate(SSLEngine engine, + Executor exec, + Consumer recycler, + Subscriber> downReader, + Subscriber> downWriter) + { this.id = scount.getAndIncrement(); this.tubeName = String.valueOf(downWriter); + this.recycler = recycler; this.reader = new Reader(); this.writer = new Writer(); this.engine = engine; @@ -212,7 +229,7 @@ * Upstream subscription strategy is to try and keep no more than * TARGET_BUFSIZE bytes in readBuf */ - class Reader extends SubscriberWrapper { + class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { // Maximum record size is 16k. // Because SocketTube can feeds us up to 3 16K buffers, // then setting this size to 16K means that the readBuf @@ -237,6 +254,11 @@ readBuf.limit(0); // keep in read mode } + @Override + public boolean supportsRecycling() { + return recycler != null; + } + protected SchedulingAction enterScheduling() { return enterReadScheduling(); } @@ -292,6 +314,11 @@ reallocReadBuf(); readBuf.put(buf); readBuf.flip(); + // should be safe to call inside lock + // since the only implementation + // offers the buffer to an unbounded queue. + // WARNING: do not touch buf after this point! + if (recycler != null) recycler.accept(buf); } if (complete) { this.completing = complete; diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Mon Apr 23 15:45:40 2018 +0100 @@ -77,6 +77,13 @@ private volatile boolean finished; public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) { + this(engine, executor, null, tube); + } + + public SSLTube(SSLEngine engine, + Executor executor, + Consumer recycler, + FlowTube tube) { Objects.requireNonNull(engine); Objects.requireNonNull(executor); this.tube = Objects.requireNonNull(tube); @@ -85,15 +92,17 @@ this.engine = engine; sslDelegate = new SSLTubeFlowDelegate(engine, executor, + recycler, readSubscriber, tube); } final class SSLTubeFlowDelegate extends SSLFlowDelegate { SSLTubeFlowDelegate(SSLEngine engine, Executor executor, + Consumer recycler, SSLSubscriberWrapper readSubscriber, FlowTube tube) { - super(engine, executor, readSubscriber, tube); + super(engine, executor, recycler, readSubscriber, tube); } protected SchedulingAction enterReadScheduling() { readSubscriber.processPendingSubscriber(); diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Mon Apr 23 15:45:40 2018 +0100 @@ -581,25 +581,6 @@ return (int) remain; } - public static long remaining(ByteBufferReference[] refs) { - long remain = 0; - for (ByteBufferReference ref : refs) { - remain += ref.get().remaining(); - } - return remain; - } - - public static int remaining(ByteBufferReference[] refs, int max) { - long remain = 0; - for (ByteBufferReference ref : refs) { - remain += ref.get().remaining(); - if (remain > max) { - throw new IllegalArgumentException("too many bytes"); - } - } - return (int) remain; - } - public static int remaining(ByteBuffer[] refs, int max) { long remain = 0; for (ByteBuffer b : refs) { @@ -623,7 +604,6 @@ public static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0); public static final ByteBuffer[] EMPTY_BB_ARRAY = new ByteBuffer[0]; public static final List EMPTY_BB_LIST = List.of(); - public static final ByteBufferReference[] EMPTY_BBR_ARRAY = new ByteBufferReference[0]; /** * Returns a slice of size {@code amount} from the given buffer. If the diff -r b583caf69b39 -r fe2bf7b369b8 test/jdk/java/net/httpclient/ProxyServer.java --- a/test/jdk/java/net/httpclient/ProxyServer.java Thu Apr 19 16:47:52 2018 +0100 +++ b/test/jdk/java/net/httpclient/ProxyServer.java Mon Apr 23 15:45:40 2018 +0100 @@ -73,12 +73,10 @@ */ public void close() throws IOException { if (debug) System.out.println("Proxy: closing"); - done = true; + done = true; listener.close(); for (Connection c : connections) { - if (c.running()) { - c.close(); - } + c.close(); } } @@ -179,7 +177,9 @@ return out.isAlive() || in.isAlive(); } - public void close() throws IOException { + private volatile boolean closing; + public synchronized void close() throws IOException { + closing = true; if (debug) System.out.println("Closing connection (proxy)"); if (serverSocket != null) serverSocket.close(); if (clientSocket != null) clientSocket.close(); @@ -238,7 +238,13 @@ i++; commonInit(dest, 80); - serverOut.write(buf, i, buf.length-i); + OutputStream sout; + synchronized (this) { + if (closing) return; + sout = serverOut; + } + // might fail if we're closing but we don't care. + sout.write(buf, i, buf.length-i); proxyCommon(); } catch (URISyntaxException e) { @@ -246,7 +252,8 @@ } } - void commonInit(String dest, int defaultPort) throws IOException { + synchronized void commonInit(String dest, int defaultPort) throws IOException { + if (closing) return; int port; String[] hostport = dest.split(":"); if (hostport.length == 1) { @@ -261,7 +268,8 @@ serverIn = new BufferedInputStream(serverSocket.getInputStream()); } - void proxyCommon() throws IOException { + synchronized void proxyCommon() throws IOException { + if (closing) return; out = new Thread(() -> { try { byte[] bb = new byte[8000]; @@ -269,6 +277,7 @@ while ((n = clientIn.read(bb)) != -1) { serverOut.write(bb, 0, n); } + closing = true; serverSocket.close(); clientSocket.close(); } catch (IOException e) { @@ -284,6 +293,7 @@ while ((n = serverIn.read(bb)) != -1) { clientOut.write(bb, 0, n); } + closing = true; serverSocket.close(); clientSocket.close(); } catch (IOException e) { @@ -302,7 +312,9 @@ } void doTunnel(String dest) throws IOException { + if (closing) return; // no need to go further. commonInit(dest, 443); + // might fail if we're closing, but we don't care. clientOut.write("HTTP/1.1 200 OK\r\n\r\n".getBytes()); proxyCommon(); } diff -r b583caf69b39 -r fe2bf7b369b8 test/jdk/java/net/httpclient/SmallTimeout.java --- a/test/jdk/java/net/httpclient/SmallTimeout.java Thu Apr 19 16:47:52 2018 +0100 +++ b/test/jdk/java/net/httpclient/SmallTimeout.java Mon Apr 23 15:45:40 2018 +0100 @@ -85,12 +85,14 @@ ss.setReuseAddress(false); ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); int port = ss.getLocalPort(); - URI uri = new URI("http://localhost:" + port + "/"); + URI u = new URI("http://localhost:" + port + "/"); HttpRequest[] requests = new HttpRequest[TIMEOUTS.length]; out.println("--- TESTING Async"); for (int i = 0; i < TIMEOUTS.length; i++) { + final int n = i; + URI uri = new URI(u.toString() + "/r" + n); requests[i] = HttpRequest.newBuilder(uri) .timeout(Duration.ofMillis(TIMEOUTS[i])) .GET() @@ -102,24 +104,25 @@ .whenComplete((HttpResponse r, Throwable t) -> { Throwable cause = null; if (r != null) { - out.println("Unexpected response: " + r); - cause = new RuntimeException("Unexpected response"); + out.println("Unexpected response for r" + n + ": " + r); + cause = new RuntimeException("Unexpected response for r" + n); error = true; } if (t != null) { if (!(t.getCause() instanceof HttpTimeoutException)) { - out.println("Wrong exception type:" + t.toString()); + out.println("Wrong exception type for r" + n + ":" + t.toString()); Throwable c = t.getCause() == null ? t : t.getCause(); c.printStackTrace(); cause = c; error = true; } else { - out.println("Caught expected timeout: " + t.getCause()); + out.println("Caught expected timeout for r" + n +": " + t.getCause()); } } if (t == null && r == null) { - out.println("Both response and throwable are null!"); - cause = new RuntimeException("Both response and throwable are null!"); + out.println("Both response and throwable are null for r" + n + "!"); + cause = new RuntimeException("Both response and throwable are null for r" + + n + "!"); error = true; } queue.add(HttpResult.of(req,cause)); @@ -134,11 +137,14 @@ // Repeat blocking in separate threads. Use queue to wait. out.println("--- TESTING Sync"); + System.err.println("================= TESTING Sync ====================="); // For running blocking response tasks ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < TIMEOUTS.length; i++) { + final int n = i; + URI uri = new URI(u.toString()+"/sync/r" + n); requests[i] = HttpRequest.newBuilder(uri) .timeout(Duration.ofMillis(TIMEOUTS[i])) .GET() @@ -148,11 +154,13 @@ executor.execute(() -> { Throwable cause = null; try { - client.send(req, BodyHandlers.replacing(null)); + HttpResponse r = client.send(req, BodyHandlers.replacing(null)); + out.println("Unexpected success for r" + n +": " + r); } catch (HttpTimeoutException e) { - out.println("Caught expected timeout: " + e); + out.println("Caught expected timeout for r" + n +": " + e); } catch (Throwable ee) { Throwable c = ee.getCause() == null ? ee : ee.getCause(); + out.println("Unexpected exception for r" + n + ": " + c); c.printStackTrace(); cause = c; error = true;