--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed May 02 10:47:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed May 02 02:36:17 2018 -0700
@@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -39,6 +38,7 @@
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
@@ -59,7 +59,7 @@
private final HttpClientImpl client;
private final SocketChannel channel;
- private final Supplier<ByteBuffer> buffersSource;
+ private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
@@ -67,10 +67,11 @@
private final long id = IDS.incrementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
- Supplier<ByteBuffer> buffersSource) {
+ Supplier<ByteBuffer> buffersFactory) {
this.client = client;
this.channel = channel;
- this.buffersSource = buffersSource;
+ this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
+
this.readPublisher = new InternalReadPublisher();
this.writeSubscriber = new InternalWriteSubscriber();
}
@@ -564,6 +565,7 @@
final InternalReadSubscription impl;
final TubeSubscriber subscriber;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final BufferSource bufferSource;
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
@@ -571,6 +573,9 @@
public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
this.impl = impl;
+ this.bufferSource = subscriber.supportsRecycling()
+ ? new SSLDirectBufferSource(client)
+ : SocketTube.this.sliceBuffersSource;
this.subscriber = subscriber;
}
@@ -779,7 +784,7 @@
if (demand.tryDecrement()) {
// we have demand.
try {
- List<ByteBuffer> bytes = readAvailable();
+ List<ByteBuffer> bytes = readAvailable(current.bufferSource);
if (bytes == EOF) {
if (!completed) {
if (debug.on()) debug.log("got read EOF");
@@ -906,6 +911,180 @@
}
// ===================================================================== //
+ // Buffer Management //
+ // ===================================================================== //
+
+ // This interface is used by readAvailable(BufferSource);
+ public interface BufferSource {
+ /**
+ * Returns a buffer to read data from the socket.
+ *
+ * @implNote
+ * Different implementation can have different strategies, as to
+ * which kind of buffer to return, or whether to return the same
+ * buffer. The only constraints are that:
+ * a. the buffer returned must not be null
+ * b. the buffer position indicates where to start reading
+ * c. the buffer limit indicates where to stop reading.
+ * d. the buffer is 'free' - that is - it is not used
+ * or retained by anybody else
+ *
+ * @return A buffer to read data from the socket.
+ */
+ ByteBuffer getBuffer();
+
+ /**
+ * Appends the read-data in {@code buffer} to the list of buffer to
+ * be sent downstream to the subscriber. May return a new
+ * list, or append to the given list.
+ *
+ * @implNote
+ * Different implementation can have different strategies, but
+ * must obviously be consistent with the implementation of the
+ * getBuffer() method. For instance, an implementation could
+ * decide to add the buffer to the list and return a new buffer
+ * next time getBuffer() is called, or could decide to add a buffer
+ * slice to the list and return the same buffer (if remaining
+ * space is available) next time getBuffer() is called.
+ *
+ * @param list The list before adding the data. Can be null.
+ * @param buffer The buffer containing the data to add to the list.
+ * @param start The start position at which data were read.
+ * The current buffer position indicates the end.
+ * @return A possibly new list where a buffer containing the
+ * data read from the socket has been added.
+ */
+ List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
+
+ /**
+ * Returns the given unused {@code buffer}, previously obtained from
+ * {@code getBuffer}.
+ *
+ * @implNote This method can be used, if necessary, to return
+ * the unused buffer to the pull.
+ *
+ * @param buffer The unused buffer.
+ */
+ default void returnUnused(ByteBuffer buffer) { }
+ }
+
+ // An implementation of BufferSource used for unencrypted data.
+ // This buffer source uses heap buffers and avoids wasting memory
+ // by forwarding read-only buffer slices downstream.
+ // Buffers allocated through this source are simply GC'ed when
+ // they are no longer referenced.
+ private static final class SliceBufferSource implements BufferSource {
+ private final Supplier<ByteBuffer> factory;
+ private volatile ByteBuffer current;
+
+ public SliceBufferSource() {
+ this(Utils::getBuffer);
+ }
+ public SliceBufferSource(Supplier<ByteBuffer> factory) {
+ this.factory = Objects.requireNonNull(factory);
+ }
+
+ // Reuses the same buffer if some space remains available.
+ // Otherwise, returns a new heap buffer.
+ @Override
+ public final ByteBuffer getBuffer() {
+ ByteBuffer buf = current;
+ buf = (buf == null || !buf.hasRemaining())
+ ? (current = factory.get()) : buf;
+ assert buf.hasRemaining();
+ return buf;
+ }
+
+ // Adds a read-only slice to the list, potentially returning a
+ // new list with that slice at the end.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ // creates a slice to add to the list
+ int limit = buf.limit();
+ buf.limit(buf.position());
+ buf.position(start);
+ ByteBuffer slice = buf.slice();
+
+ // restore buffer state to what it was before creating the slice
+ buf.position(buf.limit());
+ buf.limit(limit);
+
+ // add the buffer to the list
+ return SocketTube.listOf(list, slice.asReadOnlyBuffer());
+ }
+ }
+
+
+ // An implementation of BufferSource used for encrypted data.
+ // This buffer source uses direct byte buffers that will be
+ // recycled by the SocketTube subscriber.
+ //
+ private static final class SSLDirectBufferSource implements BufferSource {
+ private final BufferSupplier factory;
+ private final HttpClientImpl client;
+ private ByteBuffer current;
+
+ public SSLDirectBufferSource(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
+ }
+
+ // Obtains a 'free' byte buffer from the pool, or returns
+ // the same buffer if nothing was read at the previous cycle.
+ // The subscriber will be responsible for recycling this
+ // buffer into the pool (see SSLFlowDelegate.Reader)
+ @Override
+ public final ByteBuffer getBuffer() {
+ assert client.isSelectorThread();
+ ByteBuffer buf = current;
+ if (buf == null) {
+ buf = current = factory.get();
+ }
+ assert buf.hasRemaining();
+ assert buf.position() == 0;
+ return buf;
+ }
+
+ // Adds the buffer to the list. The buffer will be later returned to the
+ // pool by the subscriber (see SSLFlowDelegate.Reader).
+ // The next buffer returned by getBuffer() will be obtained from the
+ // pool. It might be the same buffer or another one.
+ // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
+ // recycling will happen in the flow before onNext returns, then the
+ // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
+ // it's shared by all SSL connections opened on that client.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ assert client.isSelectorThread();
+ assert buf.isDirect();
+ assert start == 0;
+ assert current == buf;
+ current = null;
+ buf.limit(buf.position());
+ buf.position(start);
+ // add the buffer to the list
+ return SocketTube.listOf(list, buf);
+ }
+
+ @Override
+ public void returnUnused(ByteBuffer buffer) {
+ // if current is null, then the buffer will have been added to the
+ // list, through append. Otherwise, current is not null, and needs
+ // to be returned to prevent the buffer supplier pool from growing
+ // to more than MAX_BUFFERS.
+ assert buffer == current;
+ ByteBuffer buf = current;
+ if (buf != null) {
+ assert buf.position() == 0;
+ current = null;
+ // the supplier assert if buf has remaining
+ buf.limit(buf.position());
+ factory.recycle(buf);
+ }
+ }
+ }
+
+ // ===================================================================== //
// Socket Channel Read/Write //
// ===================================================================== //
static final int MAX_BUFFERS = 3;
@@ -918,11 +1097,8 @@
// is inserted into the returned buffer list, and if the current buffer
// has remaining space, that space will be used to read more data when
// the channel becomes readable again.
- private volatile ByteBuffer current;
- private List<ByteBuffer> readAvailable() throws IOException {
- ByteBuffer buf = current;
- buf = (buf == null || !buf.hasRemaining())
- ? (current = buffersSource.get()) : buf;
+ private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
+ ByteBuffer buf = buffersSource.getBuffer();
assert buf.hasRemaining();
int read;
@@ -936,6 +1112,9 @@
}
} catch (IOException x) {
if (buf.position() == pos && list == null) {
+ // make sure that the buffer source will recycle
+ // 'buf' if needed
+ buffersSource.returnUnused(buf);
// no bytes have been read, just throw...
throw x;
} else {
@@ -951,6 +1130,7 @@
// returned if read == -1. If some data has already been read,
// then it must be returned. -1 will be returned next time
// the caller attempts to read something.
+ buffersSource.returnUnused(buf);
if (list == null) {
// nothing read - list was null - return EOF or NOTHING
list = read == -1 ? EOF : NOTHING;
@@ -960,39 +1140,27 @@
// check whether this buffer has still some free space available.
// if so, we will keep it for the next round.
- final boolean hasRemaining = buf.hasRemaining();
+ list = buffersSource.append(list, buf, pos);
- // creates a slice to add to the list
- int limit = buf.limit();
- buf.limit(buf.position());
- buf.position(pos);
- ByteBuffer slice = buf.slice();
-
- // restore buffer state to what it was before creating the slice
- buf.position(buf.limit());
- buf.limit(limit);
-
- // add the buffer to the list
- list = addToList(list, slice.asReadOnlyBuffer());
if (read <= 0 || list.size() == MAX_BUFFERS) {
break;
}
- buf = hasRemaining ? buf : (current = buffersSource.get());
+ buf = buffersSource.getBuffer();
pos = buf.position();
assert buf.hasRemaining();
}
return list;
}
- private <T> List<T> addToList(List<T> list, T item) {
+ private static <T> List<T> listOf(List<T> list, T item) {
int size = list == null ? 0 : list.size();
switch (size) {
case 0: return List.of(item);
case 1: return List.of(list.get(0), item);
case 2: return List.of(list.get(0), list.get(1), item);
default: // slow path if MAX_BUFFERS > 3
- ArrayList<T> res = new ArrayList<>(list);
+ List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
res.add(item);
return res;
}