diff -r 8e1ed2a15845 -r 4690a2871b44 src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed May 02 10:47:16 2018 +0200 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed May 02 02:36:17 2018 -0700 @@ -26,7 +26,6 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -39,6 +38,7 @@ import java.util.ArrayList; import java.util.function.Consumer; import java.util.function.Supplier; +import jdk.internal.net.http.common.BufferSupplier; import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.Logger; @@ -59,7 +59,7 @@ private final HttpClientImpl client; private final SocketChannel channel; - private final Supplier buffersSource; + private final SliceBufferSource sliceBuffersSource; private final Object lock = new Object(); private final AtomicReference errorRef = new AtomicReference<>(); private final InternalReadPublisher readPublisher; @@ -67,10 +67,11 @@ private final long id = IDS.incrementAndGet(); public SocketTube(HttpClientImpl client, SocketChannel channel, - Supplier buffersSource) { + Supplier buffersFactory) { this.client = client; this.channel = channel; - this.buffersSource = buffersSource; + this.sliceBuffersSource = new SliceBufferSource(buffersFactory); + this.readPublisher = new InternalReadPublisher(); this.writeSubscriber = new InternalWriteSubscriber(); } @@ -564,6 +565,7 @@ final InternalReadSubscription impl; final TubeSubscriber subscriber; final AtomicReference errorRef = new AtomicReference<>(); + final BufferSource bufferSource; volatile boolean subscribed; volatile boolean cancelled; volatile boolean completed; @@ -571,6 +573,9 @@ public ReadSubscription(InternalReadSubscription impl, TubeSubscriber subscriber) { this.impl = impl; + this.bufferSource = subscriber.supportsRecycling() + ? new SSLDirectBufferSource(client) + : SocketTube.this.sliceBuffersSource; this.subscriber = subscriber; } @@ -779,7 +784,7 @@ if (demand.tryDecrement()) { // we have demand. try { - List bytes = readAvailable(); + List bytes = readAvailable(current.bufferSource); if (bytes == EOF) { if (!completed) { if (debug.on()) debug.log("got read EOF"); @@ -906,6 +911,180 @@ } // ===================================================================== // + // Buffer Management // + // ===================================================================== // + + // This interface is used by readAvailable(BufferSource); + public interface BufferSource { + /** + * Returns a buffer to read data from the socket. + * + * @implNote + * Different implementation can have different strategies, as to + * which kind of buffer to return, or whether to return the same + * buffer. The only constraints are that: + * a. the buffer returned must not be null + * b. the buffer position indicates where to start reading + * c. the buffer limit indicates where to stop reading. + * d. the buffer is 'free' - that is - it is not used + * or retained by anybody else + * + * @return A buffer to read data from the socket. + */ + ByteBuffer getBuffer(); + + /** + * Appends the read-data in {@code buffer} to the list of buffer to + * be sent downstream to the subscriber. May return a new + * list, or append to the given list. + * + * @implNote + * Different implementation can have different strategies, but + * must obviously be consistent with the implementation of the + * getBuffer() method. For instance, an implementation could + * decide to add the buffer to the list and return a new buffer + * next time getBuffer() is called, or could decide to add a buffer + * slice to the list and return the same buffer (if remaining + * space is available) next time getBuffer() is called. + * + * @param list The list before adding the data. Can be null. + * @param buffer The buffer containing the data to add to the list. + * @param start The start position at which data were read. + * The current buffer position indicates the end. + * @return A possibly new list where a buffer containing the + * data read from the socket has been added. + */ + List append(List list, ByteBuffer buffer, int start); + + /** + * Returns the given unused {@code buffer}, previously obtained from + * {@code getBuffer}. + * + * @implNote This method can be used, if necessary, to return + * the unused buffer to the pull. + * + * @param buffer The unused buffer. + */ + default void returnUnused(ByteBuffer buffer) { } + } + + // An implementation of BufferSource used for unencrypted data. + // This buffer source uses heap buffers and avoids wasting memory + // by forwarding read-only buffer slices downstream. + // Buffers allocated through this source are simply GC'ed when + // they are no longer referenced. + private static final class SliceBufferSource implements BufferSource { + private final Supplier factory; + private volatile ByteBuffer current; + + public SliceBufferSource() { + this(Utils::getBuffer); + } + public SliceBufferSource(Supplier factory) { + this.factory = Objects.requireNonNull(factory); + } + + // Reuses the same buffer if some space remains available. + // Otherwise, returns a new heap buffer. + @Override + public final ByteBuffer getBuffer() { + ByteBuffer buf = current; + buf = (buf == null || !buf.hasRemaining()) + ? (current = factory.get()) : buf; + assert buf.hasRemaining(); + return buf; + } + + // Adds a read-only slice to the list, potentially returning a + // new list with that slice at the end. + @Override + public final List append(List list, ByteBuffer buf, int start) { + // creates a slice to add to the list + int limit = buf.limit(); + buf.limit(buf.position()); + buf.position(start); + ByteBuffer slice = buf.slice(); + + // restore buffer state to what it was before creating the slice + buf.position(buf.limit()); + buf.limit(limit); + + // add the buffer to the list + return SocketTube.listOf(list, slice.asReadOnlyBuffer()); + } + } + + + // An implementation of BufferSource used for encrypted data. + // This buffer source uses direct byte buffers that will be + // recycled by the SocketTube subscriber. + // + private static final class SSLDirectBufferSource implements BufferSource { + private final BufferSupplier factory; + private final HttpClientImpl client; + private ByteBuffer current; + + public SSLDirectBufferSource(HttpClientImpl client) { + this.client = Objects.requireNonNull(client); + this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); + } + + // Obtains a 'free' byte buffer from the pool, or returns + // the same buffer if nothing was read at the previous cycle. + // The subscriber will be responsible for recycling this + // buffer into the pool (see SSLFlowDelegate.Reader) + @Override + public final ByteBuffer getBuffer() { + assert client.isSelectorThread(); + ByteBuffer buf = current; + if (buf == null) { + buf = current = factory.get(); + } + assert buf.hasRemaining(); + assert buf.position() == 0; + return buf; + } + + // Adds the buffer to the list. The buffer will be later returned to the + // pool by the subscriber (see SSLFlowDelegate.Reader). + // The next buffer returned by getBuffer() will be obtained from the + // pool. It might be the same buffer or another one. + // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because + // recycling will happen in the flow before onNext returns, then the + // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though + // it's shared by all SSL connections opened on that client. + @Override + public final List append(List list, ByteBuffer buf, int start) { + assert client.isSelectorThread(); + assert buf.isDirect(); + assert start == 0; + assert current == buf; + current = null; + buf.limit(buf.position()); + buf.position(start); + // add the buffer to the list + return SocketTube.listOf(list, buf); + } + + @Override + public void returnUnused(ByteBuffer buffer) { + // if current is null, then the buffer will have been added to the + // list, through append. Otherwise, current is not null, and needs + // to be returned to prevent the buffer supplier pool from growing + // to more than MAX_BUFFERS. + assert buffer == current; + ByteBuffer buf = current; + if (buf != null) { + assert buf.position() == 0; + current = null; + // the supplier assert if buf has remaining + buf.limit(buf.position()); + factory.recycle(buf); + } + } + } + + // ===================================================================== // // Socket Channel Read/Write // // ===================================================================== // static final int MAX_BUFFERS = 3; @@ -918,11 +1097,8 @@ // is inserted into the returned buffer list, and if the current buffer // has remaining space, that space will be used to read more data when // the channel becomes readable again. - private volatile ByteBuffer current; - private List readAvailable() throws IOException { - ByteBuffer buf = current; - buf = (buf == null || !buf.hasRemaining()) - ? (current = buffersSource.get()) : buf; + private List readAvailable(BufferSource buffersSource) throws IOException { + ByteBuffer buf = buffersSource.getBuffer(); assert buf.hasRemaining(); int read; @@ -936,6 +1112,9 @@ } } catch (IOException x) { if (buf.position() == pos && list == null) { + // make sure that the buffer source will recycle + // 'buf' if needed + buffersSource.returnUnused(buf); // no bytes have been read, just throw... throw x; } else { @@ -951,6 +1130,7 @@ // returned if read == -1. If some data has already been read, // then it must be returned. -1 will be returned next time // the caller attempts to read something. + buffersSource.returnUnused(buf); if (list == null) { // nothing read - list was null - return EOF or NOTHING list = read == -1 ? EOF : NOTHING; @@ -960,39 +1140,27 @@ // check whether this buffer has still some free space available. // if so, we will keep it for the next round. - final boolean hasRemaining = buf.hasRemaining(); + list = buffersSource.append(list, buf, pos); - // creates a slice to add to the list - int limit = buf.limit(); - buf.limit(buf.position()); - buf.position(pos); - ByteBuffer slice = buf.slice(); - - // restore buffer state to what it was before creating the slice - buf.position(buf.limit()); - buf.limit(limit); - - // add the buffer to the list - list = addToList(list, slice.asReadOnlyBuffer()); if (read <= 0 || list.size() == MAX_BUFFERS) { break; } - buf = hasRemaining ? buf : (current = buffersSource.get()); + buf = buffersSource.getBuffer(); pos = buf.position(); assert buf.hasRemaining(); } return list; } - private List addToList(List list, T item) { + private static List listOf(List list, T item) { int size = list == null ? 0 : list.size(); switch (size) { case 0: return List.of(item); case 1: return List.of(list.get(0), item); case 2: return List.of(list.get(0), list.get(1), item); default: // slow path if MAX_BUFFERS > 3 - ArrayList res = new ArrayList<>(list); + List res = list instanceof ArrayList ? list : new ArrayList<>(list); res.add(item); return res; }