http-client-branch: unused byte buffers must be returned to the pool before exiting from SocketTube::readAvailable + minor test fixes
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Apr 25 19:51:58 2018 +0100
@@ -379,11 +379,13 @@
subscriber.onError(t);
} finally {
cf.completeExceptionally(t);
- connection.close();
}
}
} finally {
bodyReader.onComplete(t);
+ if (t != null) {
+ connection.close();
+ }
}
}));
CompletableFuture<State> bodyReaderCF = bodyReader.completion();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Apr 25 19:51:58 2018 +0100
@@ -1198,7 +1198,7 @@
private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
private final HttpClientImpl client;
private final Logger debug;
- int tail; // no need for volatile: only accessed in SM thread.
+ private int tail, count; // no need for volatile: only accessed in SM thread.
public SSLDirectBufferSupplier(HttpClientImpl client) {
this.client = Objects.requireNonNull(client);
this.debug = client.debug;
@@ -1208,18 +1208,20 @@
@Override
public ByteBuffer get() {
assert client.isSelectorThread();
- ByteBuffer buf = tail == 0 ? null : pool[--tail];
- if (buf == null) {
+ assert tail <= POOL_SIZE : "allocate tail is " + tail;
+ ByteBuffer buf;
+ if (tail == 0) {
if (debug.on()) {
// should not appear more than SocketTube.MAX_BUFFERS
debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
}
+ assert count++ < POOL_SIZE : "trying to allocate more than "
+ + POOL_SIZE + " buffers";
buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
} else {
- // if (debug.on()) { // this trace is mostly noise.
- // debug.log("ByteBuffer.recycle(%d)", buf.remaining());
- // }
- assert buf == pool[tail];
+ assert tail > 0 : "non positive tail value: " + tail;
+ tail--;
+ buf = pool[tail];
pool[tail] = null;
}
assert buf.isDirect();
@@ -1237,9 +1239,15 @@
assert client.isSelectorThread();
assert buffer.isDirect();
assert !buffer.hasRemaining();
+ assert tail < POOL_SIZE : "recycle tail is " + tail;
+ assert tail >= 0;
buffer.position(0);
buffer.limit(buffer.capacity());
- pool[tail++] = buffer;
+ // don't fail if assertions are off. we have asserted above.
+ if (tail < POOL_SIZE) {
+ pool[tail] = buffer;
+ tail++;
+ }
assert tail <= POOL_SIZE;
assert tail > 0;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Apr 25 19:51:58 2018 +0100
@@ -784,7 +784,7 @@
if (demand.tryDecrement()) {
// we have demand.
try {
- List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
+ List<ByteBuffer> bytes = readAvailable(current.bufferSource);
if (bytes == EOF) {
if (!completed) {
if (debug.on()) debug.log("got read EOF");
@@ -918,6 +918,8 @@
public interface BufferSource {
/**
* Returns a buffer to read data from the socket.
+ *
+ * @implNote
* Different implementation can have different strategies, as to
* which kind of buffer to return, or whether to return the same
* buffer. The only constraints are that
@@ -926,6 +928,7 @@
* c. the buffer limit indicates where to stop reading.
* d. the buffer is 'free' - that is - it is not used
* or retained by anybody else
+ *
* @return A buffer to read data from the socket.
*/
ByteBuffer getBuffer();
@@ -935,6 +938,7 @@
* be sent downstream to the subscriber. May return a new
* list, or append to the given list.
*
+ * @implNote
* Different implementation can have different strategies, but
* must obviously be consistent with the implementation of the
* getBuffer() method. For instance, an implementation could
@@ -950,7 +954,18 @@
* @return A possibly new list where a buffer containing the
* data read from the socket has been added.
*/
- List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+ List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
+
+ /**
+ * Called when a buffer obtained from {@code getBuffer} will not
+ * be used because no data has been read.
+ *
+ * @implNote This method can be used, if necessary, to return
+ * the unused buffer to the pull.
+ *
+ * @param buffer The unused buffer.
+ */
+ default void returnUnused(ByteBuffer buffer) { }
}
// An implementation of BufferSource used for unencrypted data.
@@ -958,7 +973,7 @@
// by forwarding read only buffer slices downstream.
// Buffers allocated through this source are simply GC'ed when
// they are no longer referenced.
- static final class SliceBufferSource implements BufferSource {
+ private static final class SliceBufferSource implements BufferSource {
private final Supplier<ByteBuffer> factory;
private volatile ByteBuffer current;
public SliceBufferSource() {
@@ -1003,10 +1018,10 @@
// This buffer source use direct byte buffers that will be
// recycled by the SocketTube subscriber.
//
- static final class SSLDirectBufferSource implements BufferSource {
- private final Supplier<ByteBuffer> factory;
+ private static final class SSLDirectBufferSource implements BufferSource {
+ private final BufferSupplier factory;
private final HttpClientImpl client;
- private volatile ByteBuffer current;
+ private ByteBuffer current;
public SSLDirectBufferSource(HttpClientImpl client) {
this.client = Objects.requireNonNull(client);
@@ -1049,6 +1064,23 @@
// add the buffer to the list
return SocketTube.listOf(list, buf);
}
+
+ @Override
+ public void returnUnused(ByteBuffer buffer) {
+ // if current is not null it will not be added to the
+ // list. We need to recycle it now to prevent
+ // the buffer supplier pool to grow over more than
+ // MAX_BUFFERS.
+ assert buffer == current;
+ ByteBuffer buf = current;
+ if (buf != null) {
+ assert buf.position() == 0;
+ current = null;
+ // the supplier assert if buf has remaining
+ buf.limit(buf.position());
+ factory.recycle(buf);
+ }
+ }
}
// ===================================================================== //
@@ -1079,6 +1111,9 @@
}
} catch (IOException x) {
if (buf.position() == pos && list == null) {
+ // make sure that the buffer source will recycle
+ // 'buf' if needed
+ buffersSource.returnUnused(buf);
// no bytes have been read, just throw...
throw x;
} else {
@@ -1094,6 +1129,7 @@
// returned if read == -1. If some data has already been read,
// then it must be returned. -1 will be returned next time
// the caller attempts to read something.
+ buffersSource.returnUnused(buf);
if (list == null) {
// nothing read - list was null - return EOF or NOTHING
list = read == -1 ? EOF : NOTHING;
@@ -1103,7 +1139,6 @@
// check whether this buffer has still some free space available.
// if so, we will keep it for the next round.
- final boolean hasRemaining = buf.hasRemaining();
list = buffersSource.append(list, buf, pos);
if (read <= 0 || list.size() == MAX_BUFFERS) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed Apr 25 19:51:58 2018 +0100
@@ -197,9 +197,11 @@
sb.append("SSL: id ").append(id);
sb.append(" HS state: " + states(handshakeState));
sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
- sb.append(" LL : ");
- for (String s: stateList) {
- sb.append(s).append(" ");
+ if (stateList != null) {
+ sb.append(" LL : ");
+ for (String s : stateList) {
+ sb.append(s).append(" ");
+ }
}
sb.append("\r\n");
sb.append("Reader:: ").append(reader.toString());
@@ -813,13 +815,16 @@
}
final AtomicInteger handshakeState;
- final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
+ final ConcurrentLinkedQueue<String> stateList =
+ debug.on() ? new ConcurrentLinkedQueue<>() : null;
private boolean doHandshake(EngineResult r, int caller) {
// unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
- stateList.add(r.handshakeStatus().toString());
- stateList.add(Integer.toString(caller));
+ if (stateList != null && debug.on()) {
+ stateList.add(r.handshakeStatus().toString());
+ stateList.add(Integer.toString(caller));
+ }
switch (r.handshakeStatus()) {
case NEED_TASK:
int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
--- a/test/jdk/java/net/httpclient/DependentPromiseActionsTest.java Tue Apr 24 19:45:20 2018 +0100
+++ b/test/jdk/java/net/httpclient/DependentPromiseActionsTest.java Wed Apr 25 19:51:58 2018 +0100
@@ -177,6 +177,8 @@
};
}
+ enum SubscriberType {EAGER, LAZZY}
+
static final class SemaphoreStallerSupplier
implements Supplier<SemaphoreStaller> {
@Override
@@ -291,7 +293,8 @@
String test = format("testAsStringAsync(%s, %b, %s)",
uri, sameClient, stallers);
testDependent(test, uri, sameClient, BodyHandlers::ofString,
- this::finish, this::extractString, stallers);
+ this::finish, this::extractString, stallers,
+ SubscriberType.EAGER);
}
@Test(dataProvider = "variants")
@@ -303,7 +306,8 @@
String test = format("testAsLinesAsync(%s, %b, %s)",
uri, sameClient, stallers);
testDependent(test, uri, sameClient, BodyHandlers::ofLines,
- this::finish, this::extractStream, stallers);
+ this::finish, this::extractStream, stallers,
+ SubscriberType.LAZZY);
}
@Test(dataProvider = "variants")
@@ -315,19 +319,22 @@
String test = format("testAsInputStreamAsync(%s, %b, %s)",
uri, sameClient, stallers);
testDependent(test, uri, sameClient, BodyHandlers::ofInputStream,
- this::finish, this::extractInputStream, stallers);
+ this::finish, this::extractInputStream, stallers,
+ SubscriberType.LAZZY);
}
private <T,U> void testDependent(String name, String uri, boolean sameClient,
Supplier<BodyHandler<T>> handlers,
Finisher finisher,
Extractor<T> extractor,
- Supplier<Staller> stallers)
+ Supplier<Staller> stallers,
+ SubscriberType subscriberType)
throws Exception
{
out.printf("%n%s%s%n", now(), name);
try {
- testDependent(uri, sameClient, handlers, finisher, extractor, stallers);
+ testDependent(uri, sameClient, handlers, finisher,
+ extractor, stallers, subscriberType);
} catch (Error | Exception x) {
FAILURES.putIfAbsent(name, x);
throw x;
@@ -338,7 +345,8 @@
Supplier<BodyHandler<T>> handlers,
Finisher finisher,
Extractor<T> extractor,
- Supplier<Staller> stallers)
+ Supplier<Staller> stallers,
+ SubscriberType subscriberType)
throws Exception
{
HttpClient client = null;
@@ -355,7 +363,7 @@
System.out.println("try stalling in " + where);
CompletableFuture<HttpResponse<T>> responseCF =
client.sendAsync(req, handler, promiseHandler);
- assert !responseCF.isDone();
+ assert subscriberType == SubscriberType.LAZZY || !responseCF.isDone();
finisher.finish(where, responseCF, promiseHandler, extractor);
}
}