diff -r b583caf69b39 -r fe2bf7b369b8 src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Apr 19 16:47:52 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 23 15:45:40 2018 +0100 @@ -26,7 +26,6 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -39,6 +38,7 @@ import java.util.ArrayList; import java.util.function.Consumer; import java.util.function.Supplier; +import jdk.internal.net.http.common.BufferSupplier; import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.Logger; @@ -59,7 +59,7 @@ private final HttpClientImpl client; private final SocketChannel channel; - private final Supplier buffersSource; + private final SliceBufferSource sliceBuffersSource; private final Object lock = new Object(); private final AtomicReference errorRef = new AtomicReference<>(); private final InternalReadPublisher readPublisher; @@ -67,10 +67,11 @@ private final long id = IDS.incrementAndGet(); public SocketTube(HttpClientImpl client, SocketChannel channel, - Supplier buffersSource) { + Supplier buffersFactory) { this.client = client; this.channel = channel; - this.buffersSource = buffersSource; + this.sliceBuffersSource = new SliceBufferSource(buffersFactory); + this.readPublisher = new InternalReadPublisher(); this.writeSubscriber = new InternalWriteSubscriber(); } @@ -564,6 +565,7 @@ final InternalReadSubscription impl; final TubeSubscriber subscriber; final AtomicReference errorRef = new AtomicReference<>(); + final BufferSource bufferSource; volatile boolean subscribed; volatile boolean cancelled; volatile boolean completed; @@ -571,6 +573,9 @@ public ReadSubscription(InternalReadSubscription impl, TubeSubscriber subscriber) { this.impl = impl; + this.bufferSource = subscriber.supportsRecycling() + ? new SSLDirectBufferSource(client) + : SocketTube.this.sliceBuffersSource; this.subscriber = subscriber; } @@ -779,7 +784,7 @@ if (demand.tryDecrement()) { // we have demand. try { - List bytes = readAvailable(); + List bytes = readAvailable(subscription.bufferSource); if (bytes == EOF) { if (!completed) { if (debug.on()) debug.log("got read EOF"); @@ -906,6 +911,147 @@ } // ===================================================================== // + // Buffer Management // + // ===================================================================== // + + // This interface is used by readAvailable(BufferSource); + public interface BufferSource { + /** + * Returns a buffer to read data from the socket. + * Different implementation can have different strategies, as to + * which kind of buffer to return, or whether to return the same + * buffer. The only constraints are that + * a. the buffer returned must not be null + * b. the buffer position indicates where to start reading + * c. the buffer limit indicates where to stop reading. + * d. the buffer is 'free' - that is - it is not used + * or retained by anybody else + * @return A buffer to read data from the socket. + */ + ByteBuffer getBuffer(); + + /** + * Append the data read into the buffer to the list of buffer to + * be sent downstream to the subscriber. May return a new + * list, or append to the given list. + * + * Different implementation can have different strategies, but + * must obviously be consistent with the implementation of the + * getBuffer() method. For instance, an implementation could + * decide to add the buffer to the list and return a new buffer + * next time getBuffer() is called, or could decide to add a buffer + * slice to the list and return the same buffer (if remaining + * space is available) next time getBuffer() is called. + * + * @param list The list before adding the data. Can be null. + * @param buffer The buffer containing the data to add to the list. + * @param start The start position at which data were read. + * The current buffer position indicates the end. + * @return A possibly new list where a buffer containing the + * data read from the socket has been added. + */ + List append(List list, ByteBuffer buffer, int start); + } + + // An implementation of BufferSource used for unencrypted data. + // This buffer source uses heap buffers and avoids wasting memory + // by forwarding read only buffer slices downstream. + // Buffers allocated through this source are simply GC'ed when + // they are no longer referenced. + static final class SliceBufferSource implements BufferSource { + private final Supplier factory; + private volatile ByteBuffer current; + public SliceBufferSource() { + this(Utils::getBuffer); + } + public SliceBufferSource(Supplier factory) { + this.factory = Objects.requireNonNull(factory); + } + + // reuse the same buffer if some space remains available. + // otherwise, returns a new heap buffer. + @Override + public final ByteBuffer getBuffer() { + ByteBuffer buf = current; + buf = (buf == null || !buf.hasRemaining()) + ? (current = factory.get()) : buf; + assert buf.hasRemaining(); + return buf; + } + + // Adds a read only slice to the list, potentially returning a + // new list with with that slice at the end. + @Override + public final List append(List list, ByteBuffer buf, int start) { + // creates a slice to add to the list + int limit = buf.limit(); + buf.limit(buf.position()); + buf.position(start); + ByteBuffer slice = buf.slice(); + + // restore buffer state to what it was before creating the slice + buf.position(buf.limit()); + buf.limit(limit); + + // add the buffer to the list + return SocketTube.listOf(list, slice.asReadOnlyBuffer()); + } + } + + + // An implementation of BufferSource used for encrypted data. + // This buffer source use direct byte buffers that will be + // recycled by the SocketTube subscriber. + // + static final class SSLDirectBufferSource implements BufferSource { + private final Supplier factory; + private final HttpClientImpl client; + private volatile ByteBuffer current; + + public SSLDirectBufferSource(HttpClientImpl client) { + this.client = Objects.requireNonNull(client); + this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); + } + + // Obtain a 'free' byte buffer from the pool, or return + // the same buffer if nothing was read at the previous cycle. + // The subscriber will be responsible for recycling this + // buffer into the pool (see SSLFlowDelegate.Reader) + @Override + public final ByteBuffer getBuffer() { + assert client.isSelectorThread(); + ByteBuffer buf = current; + if (buf == null) { + buf = current = factory.get(); + } + assert buf.hasRemaining(); + assert buf.position() == 0; + return buf; + } + + // Adds the buffer to the list. The buffer will be later returned to the + // pool by the subscriber (see SSLFlowDelegate.Reader). + // The next buffer returned by getBuffer() will be obtained from the + // pool. It might be the same buffer or another one. + // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because + // recycling will happen in the flow before onNext returns, then the + // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though + // it's shared by all SSL connections opened on that client. + @Override + public final List append(List list, ByteBuffer buf, int start) { + assert client.isSelectorThread(); + assert buf.isDirect(); + assert start == 0; + assert current == buf; + current = null; + buf.limit(buf.position()); + buf.position(start); + // add the buffer to the list + return SocketTube.listOf(list, buf); + } + } + + // ===================================================================== // // Socket Channel Read/Write // // ===================================================================== // static final int MAX_BUFFERS = 3; @@ -918,11 +1064,8 @@ // is inserted into the returned buffer list, and if the current buffer // has remaining space, that space will be used to read more data when // the channel becomes readable again. - private volatile ByteBuffer current; - private List readAvailable() throws IOException { - ByteBuffer buf = current; - buf = (buf == null || !buf.hasRemaining()) - ? (current = buffersSource.get()) : buf; + private List readAvailable(BufferSource buffersSource) throws IOException { + ByteBuffer buf = buffersSource.getBuffer(); assert buf.hasRemaining(); int read; @@ -961,31 +1104,20 @@ // check whether this buffer has still some free space available. // if so, we will keep it for the next round. final boolean hasRemaining = buf.hasRemaining(); - - // creates a slice to add to the list - int limit = buf.limit(); - buf.limit(buf.position()); - buf.position(pos); - ByteBuffer slice = buf.slice(); + list = buffersSource.append(list, buf, pos); - // restore buffer state to what it was before creating the slice - buf.position(buf.limit()); - buf.limit(limit); - - // add the buffer to the list - list = addToList(list, slice.asReadOnlyBuffer()); if (read <= 0 || list.size() == MAX_BUFFERS) { break; } - buf = hasRemaining ? buf : (current = buffersSource.get()); + buf = buffersSource.getBuffer(); pos = buf.position(); assert buf.hasRemaining(); } return list; } - private List addToList(List list, T item) { + private static List listOf(List list, T item) { int size = list == null ? 0 : list.size(); switch (size) { case 0: return List.of(item);