8193356: HttpClient should create a new connection after available stream ids are used up.
Reviewed-by: chegar, dfuchs
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java Tue May 01 19:14:46 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java Tue May 01 22:37:29 2018 +0100
@@ -95,10 +95,10 @@
synchronized (this) {
Http2Connection connection = connections.get(key);
if (connection != null) {
- if (connection.closed) {
+ if (connection.closed || !connection.reserveStream(true)) {
if (debug.on())
- debug.log("removing found closed connection: %s", connection);
- connections.remove(key);
+ debug.log("removing found closed or closing connection: %s", connection);
+ deleteConnection(connection);
} else {
// fast path if connection already exists
if (debug.on())
@@ -138,9 +138,9 @@
*/
boolean offerConnection(Http2Connection c) {
if (debug.on()) debug.log("offering to the connection pool: %s", c);
- if (c.closed) {
+ if (c.closed || c.finalStream()) {
if (debug.on())
- debug.log("skipping offered closed connection: %s", c);
+ debug.log("skipping offered closed or closing connection: %s", c);
return false;
}
@@ -148,7 +148,7 @@
synchronized(this) {
Http2Connection c1 = connections.putIfAbsent(key, c);
if (c1 != null) {
- c.setSingleStream(true);
+ c.setFinalStream();
if (debug.on())
debug.log("existing entry in connection pool for %s", key);
return false;
@@ -163,9 +163,12 @@
if (debug.on())
debug.log("removing from the connection pool: %s", c);
synchronized (this) {
- connections.remove(c.key());
- if (debug.on())
- debug.log("removed from the connection pool: %s", c);
+ Http2Connection c1 = connections.get(c.key());
+ if (c1 != null && c1.equals(c)) {
+ connections.remove(c.key());
+ if (debug.on())
+ debug.log("removed from the connection pool: %s", c);
+ }
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue May 01 19:14:46 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue May 01 22:37:29 2018 +0100
@@ -121,7 +121,20 @@
Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK);
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
- private boolean singleStream; // used only for stream 1, then closed
+ static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
+ static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
+
+ /**
+ * Flag set when no more streams to be opened on this connection. Two cases where it is used.
+ *
+ * 1. two connections to the same server were opened concurrently, in which case one of them will be put in the cache,
+ * and the second will expire when all its opened streams (which usually should be a single client stream + possibly
+ * some additional push-promise server streams) complete.
+ * 2. A cached connection reaches its maximum number of streams (~ 2^31-1) either server / or client allocated,
+ * in which case it will be taken out of the cache - allowing a new connection to replace it. It will expire when all
+ * its still open streams (which could be many) eventually complete.
+ */
+ private boolean finalStream;
/*
* ByteBuffer pooling strategy for HTTP/2 protocol.
@@ -236,6 +249,12 @@
private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
private int nextstreamid;
private int nextPushStream = 2;
+ // actual stream ids are not allocated until the Headers frame is ready
+ // to be sent. The following two fields are updated as soon as a stream
+ // is created and assigned to a connection. They are checked before
+ // assigning a stream to a connection.
+ private int lastReservedClientStreamid = 1;
+ private int lastReservedServerStreamid = 0;
private final Encoder hpackOut;
private final Decoder hpackIn;
final SettingsFrame clientSettings;
@@ -381,6 +400,29 @@
return client2.client();
}
+ // call these before assigning a request/stream to a connection
+ // if false returned then a new Http2Connection is required
+ // if true, the the stream may be assigned to this connection
+ synchronized boolean reserveStream(boolean clientInitiated) {
+ if (finalStream) {
+ return false;
+ }
+ if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
+ setFinalStream();
+ client2.deleteConnection(this);
+ return false;
+ } else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
+ setFinalStream();
+ client2.deleteConnection(this);
+ return false;
+ }
+ if (clientInitiated)
+ lastReservedClientStreamid+=2;
+ else
+ lastReservedServerStreamid+=2;
+ return true;
+ }
+
/**
* Throws an IOException if h2 was not negotiated
*/
@@ -430,12 +472,16 @@
.thenCompose(checkAlpnCF);
}
- synchronized boolean singleStream() {
- return singleStream;
+ synchronized boolean finalStream() {
+ return finalStream;
}
- synchronized void setSingleStream(boolean use) {
- singleStream = use;
+ /**
+ * Mark this connection so no more streams created on it and it will close when
+ * all are complete.
+ */
+ synchronized void setFinalStream() {
+ finalStream = true;
}
static String keyFor(HttpConnection connection) {
@@ -709,6 +755,9 @@
if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
return;
+ } else if (!reserveStream(false)) {
+ resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM);
+ return;
} else {
nextPushStream += 2;
}
@@ -768,7 +817,7 @@
// corresponding entry in the window controller.
windowController.removeStream(streamid);
}
- if (singleStream() && streams.isEmpty()) {
+ if (finalStream() && streams.isEmpty()) {
// should be only 1 stream, but there might be more if server push
close();
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Tue May 01 19:14:46 2018 +0100
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Tue May 01 22:37:29 2018 +0100
@@ -67,11 +67,11 @@
}
public Http2TestServer(String serverName, boolean secure, int port) throws Exception {
- this(serverName, secure, port, getDefaultExecutor(), null);
+ this(serverName, secure, port, getDefaultExecutor(), 50, null);
}
public Http2TestServer(boolean secure, int port) throws Exception {
- this(null, secure, port, getDefaultExecutor(), null);
+ this(null, secure, port, getDefaultExecutor(), 50, null);
}
public InetSocketAddress getAddress() {
@@ -85,19 +85,29 @@
public Http2TestServer(boolean secure,
SSLContext context) throws Exception {
- this(null, secure, 0, null, context);
+ this(null, secure, 0, null, 50, context);
}
public Http2TestServer(String serverName, boolean secure,
SSLContext context) throws Exception {
- this(serverName, secure, 0, null, context);
+ this(serverName, secure, 0, null, 50, context);
}
public Http2TestServer(boolean secure,
int port,
ExecutorService exec,
SSLContext context) throws Exception {
- this(null, secure, port, exec, context);
+ this(null, secure, port, exec, 50, context);
+ }
+
+ public Http2TestServer(String serverName,
+ boolean secure,
+ int port,
+ ExecutorService exec,
+ SSLContext context)
+ throws Exception
+ {
+ this(serverName, secure, port, exec, 50, context);
}
/**
@@ -109,20 +119,22 @@
* @param secure https or http
* @param port listen port
* @param exec executor service (cached thread pool is used if null)
+ * @param backlog the server socket backlog
* @param context the SSLContext used when secure is true
*/
public Http2TestServer(String serverName,
boolean secure,
int port,
ExecutorService exec,
+ int backlog,
SSLContext context)
throws Exception
{
this.serverName = serverName;
if (secure) {
- server = initSecure(port);
+ server = initSecure(port, backlog);
} else {
- server = initPlaintext(port);
+ server = initPlaintext(port, backlog);
}
this.secure = secure;
this.exec = exec == null ? getDefaultExecutor() : exec;
@@ -171,10 +183,10 @@
return handler;
}
- final ServerSocket initPlaintext(int port) throws Exception {
+ final ServerSocket initPlaintext(int port, int backlog) throws Exception {
ServerSocket ss = new ServerSocket();
ss.setReuseAddress(false);
- ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+ ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), backlog);
return ss;
}
@@ -192,7 +204,7 @@
}
- final ServerSocket initSecure(int port) throws Exception {
+ final ServerSocket initSecure(int port, int backlog) throws Exception {
ServerSocketFactory fac;
if (sslContext != null) {
fac = sslContext.getServerSocketFactory();
@@ -201,7 +213,7 @@
}
SSLServerSocket se = (SSLServerSocket) fac.createServerSocket();
se.setReuseAddress(false);
- se.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+ se.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), backlog);
SSLParameters sslp = se.getSSLParameters();
sslp.setApplicationProtocols(new String[]{"h2"});
sslp.setEndpointIdentificationAlgorithm("HTTPS");