--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 23 15:45:40 2018 +0100
@@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -39,6 +38,7 @@
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
@@ -59,7 +59,7 @@
private final HttpClientImpl client;
private final SocketChannel channel;
- private final Supplier<ByteBuffer> buffersSource;
+ private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
@@ -67,10 +67,11 @@
private final long id = IDS.incrementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
- Supplier<ByteBuffer> buffersSource) {
+ Supplier<ByteBuffer> buffersFactory) {
this.client = client;
this.channel = channel;
- this.buffersSource = buffersSource;
+ this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
+
this.readPublisher = new InternalReadPublisher();
this.writeSubscriber = new InternalWriteSubscriber();
}
@@ -564,6 +565,7 @@
final InternalReadSubscription impl;
final TubeSubscriber subscriber;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final BufferSource bufferSource;
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
@@ -571,6 +573,9 @@
public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
this.impl = impl;
+ this.bufferSource = subscriber.supportsRecycling()
+ ? new SSLDirectBufferSource(client)
+ : SocketTube.this.sliceBuffersSource;
this.subscriber = subscriber;
}
@@ -779,7 +784,7 @@
if (demand.tryDecrement()) {
// we have demand.
try {
- List<ByteBuffer> bytes = readAvailable();
+ List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
if (bytes == EOF) {
if (!completed) {
if (debug.on()) debug.log("got read EOF");
@@ -906,6 +911,147 @@
}
// ===================================================================== //
+ // Buffer Management //
+ // ===================================================================== //
+
+ // This interface is used by readAvailable(BufferSource);
+ public interface BufferSource {
+ /**
+ * Returns a buffer to read data from the socket.
+ * Different implementation can have different strategies, as to
+ * which kind of buffer to return, or whether to return the same
+ * buffer. The only constraints are that
+ * a. the buffer returned must not be null
+ * b. the buffer position indicates where to start reading
+ * c. the buffer limit indicates where to stop reading.
+ * d. the buffer is 'free' - that is - it is not used
+ * or retained by anybody else
+ * @return A buffer to read data from the socket.
+ */
+ ByteBuffer getBuffer();
+
+ /**
+ * Append the data read into the buffer to the list of buffer to
+ * be sent downstream to the subscriber. May return a new
+ * list, or append to the given list.
+ *
+ * Different implementation can have different strategies, but
+ * must obviously be consistent with the implementation of the
+ * getBuffer() method. For instance, an implementation could
+ * decide to add the buffer to the list and return a new buffer
+ * next time getBuffer() is called, or could decide to add a buffer
+ * slice to the list and return the same buffer (if remaining
+ * space is available) next time getBuffer() is called.
+ *
+ * @param list The list before adding the data. Can be null.
+ * @param buffer The buffer containing the data to add to the list.
+ * @param start The start position at which data were read.
+ * The current buffer position indicates the end.
+ * @return A possibly new list where a buffer containing the
+ * data read from the socket has been added.
+ */
+ List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+ }
+
+ // An implementation of BufferSource used for unencrypted data.
+ // This buffer source uses heap buffers and avoids wasting memory
+ // by forwarding read only buffer slices downstream.
+ // Buffers allocated through this source are simply GC'ed when
+ // they are no longer referenced.
+ static final class SliceBufferSource implements BufferSource {
+ private final Supplier<ByteBuffer> factory;
+ private volatile ByteBuffer current;
+ public SliceBufferSource() {
+ this(Utils::getBuffer);
+ }
+ public SliceBufferSource(Supplier<ByteBuffer> factory) {
+ this.factory = Objects.requireNonNull(factory);
+ }
+
+ // reuse the same buffer if some space remains available.
+ // otherwise, returns a new heap buffer.
+ @Override
+ public final ByteBuffer getBuffer() {
+ ByteBuffer buf = current;
+ buf = (buf == null || !buf.hasRemaining())
+ ? (current = factory.get()) : buf;
+ assert buf.hasRemaining();
+ return buf;
+ }
+
+ // Adds a read only slice to the list, potentially returning a
+ // new list with with that slice at the end.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ // creates a slice to add to the list
+ int limit = buf.limit();
+ buf.limit(buf.position());
+ buf.position(start);
+ ByteBuffer slice = buf.slice();
+
+ // restore buffer state to what it was before creating the slice
+ buf.position(buf.limit());
+ buf.limit(limit);
+
+ // add the buffer to the list
+ return SocketTube.listOf(list, slice.asReadOnlyBuffer());
+ }
+ }
+
+
+ // An implementation of BufferSource used for encrypted data.
+ // This buffer source use direct byte buffers that will be
+ // recycled by the SocketTube subscriber.
+ //
+ static final class SSLDirectBufferSource implements BufferSource {
+ private final Supplier<ByteBuffer> factory;
+ private final HttpClientImpl client;
+ private volatile ByteBuffer current;
+
+ public SSLDirectBufferSource(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
+ }
+
+ // Obtain a 'free' byte buffer from the pool, or return
+ // the same buffer if nothing was read at the previous cycle.
+ // The subscriber will be responsible for recycling this
+ // buffer into the pool (see SSLFlowDelegate.Reader)
+ @Override
+ public final ByteBuffer getBuffer() {
+ assert client.isSelectorThread();
+ ByteBuffer buf = current;
+ if (buf == null) {
+ buf = current = factory.get();
+ }
+ assert buf.hasRemaining();
+ assert buf.position() == 0;
+ return buf;
+ }
+
+ // Adds the buffer to the list. The buffer will be later returned to the
+ // pool by the subscriber (see SSLFlowDelegate.Reader).
+ // The next buffer returned by getBuffer() will be obtained from the
+ // pool. It might be the same buffer or another one.
+ // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
+ // recycling will happen in the flow before onNext returns, then the
+ // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
+ // it's shared by all SSL connections opened on that client.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ assert client.isSelectorThread();
+ assert buf.isDirect();
+ assert start == 0;
+ assert current == buf;
+ current = null;
+ buf.limit(buf.position());
+ buf.position(start);
+ // add the buffer to the list
+ return SocketTube.listOf(list, buf);
+ }
+ }
+
+ // ===================================================================== //
// Socket Channel Read/Write //
// ===================================================================== //
static final int MAX_BUFFERS = 3;
@@ -918,11 +1064,8 @@
// is inserted into the returned buffer list, and if the current buffer
// has remaining space, that space will be used to read more data when
// the channel becomes readable again.
- private volatile ByteBuffer current;
- private List<ByteBuffer> readAvailable() throws IOException {
- ByteBuffer buf = current;
- buf = (buf == null || !buf.hasRemaining())
- ? (current = buffersSource.get()) : buf;
+ private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
+ ByteBuffer buf = buffersSource.getBuffer();
assert buf.hasRemaining();
int read;
@@ -961,31 +1104,20 @@
// check whether this buffer has still some free space available.
// if so, we will keep it for the next round.
final boolean hasRemaining = buf.hasRemaining();
-
- // creates a slice to add to the list
- int limit = buf.limit();
- buf.limit(buf.position());
- buf.position(pos);
- ByteBuffer slice = buf.slice();
+ list = buffersSource.append(list, buf, pos);
- // restore buffer state to what it was before creating the slice
- buf.position(buf.limit());
- buf.limit(limit);
-
- // add the buffer to the list
- list = addToList(list, slice.asReadOnlyBuffer());
if (read <= 0 || list.size() == MAX_BUFFERS) {
break;
}
- buf = hasRemaining ? buf : (current = buffersSource.get());
+ buf = buffersSource.getBuffer();
pos = buf.position();
assert buf.hasRemaining();
}
return list;
}
- private <T> List<T> addToList(List<T> list, T item) {
+ private static <T> List<T> listOf(List<T> list, T item) {
int size = list == null ? 0 : list.size();
switch (size) {
case 0: return List.of(item);