8193356: HttpClient should create a new connection after available stream ids are used up. http-client-branch
authormichaelm
Tue, 01 May 2018 22:37:29 +0100
branchhttp-client-branch
changeset 56505 3b4b23c3758e
parent 56504 dd13fb9f1603
child 56506 487a640f283c
8193356: HttpClient should create a new connection after available stream ids are used up. Reviewed-by: chegar, dfuchs
src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
test/jdk/java/net/httpclient/http2/server/Http2TestServer.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java	Tue May 01 19:14:46 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java	Tue May 01 22:37:29 2018 +0100
@@ -95,10 +95,10 @@
         synchronized (this) {
             Http2Connection connection = connections.get(key);
             if (connection != null) {
-                if (connection.closed) {
+                if (connection.closed || !connection.reserveStream(true)) {
                     if (debug.on())
-                        debug.log("removing found closed connection: %s", connection);
-                    connections.remove(key);
+                        debug.log("removing found closed or closing connection: %s", connection);
+                    deleteConnection(connection);
                 } else {
                     // fast path if connection already exists
                     if (debug.on())
@@ -138,9 +138,9 @@
      */
     boolean offerConnection(Http2Connection c) {
         if (debug.on()) debug.log("offering to the connection pool: %s", c);
-        if (c.closed) {
+        if (c.closed || c.finalStream()) {
             if (debug.on())
-                debug.log("skipping offered closed connection: %s", c);
+                debug.log("skipping offered closed or closing connection: %s", c);
             return false;
         }
 
@@ -148,7 +148,7 @@
         synchronized(this) {
             Http2Connection c1 = connections.putIfAbsent(key, c);
             if (c1 != null) {
-                c.setSingleStream(true);
+                c.setFinalStream();
                 if (debug.on())
                     debug.log("existing entry in connection pool for %s", key);
                 return false;
@@ -163,9 +163,12 @@
         if (debug.on())
             debug.log("removing from the connection pool: %s", c);
         synchronized (this) {
-            connections.remove(c.key());
-            if (debug.on())
-                debug.log("removed from the connection pool: %s", c);
+            Http2Connection c1 = connections.get(c.key());
+            if (c1 != null && c1.equals(c)) {
+                connections.remove(c.key());
+                if (debug.on())
+                    debug.log("removed from the connection pool: %s", c);
+            }
         }
     }
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue May 01 19:14:46 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue May 01 22:37:29 2018 +0100
@@ -121,7 +121,20 @@
             Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK);
     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 
-    private boolean singleStream; // used only for stream 1, then closed
+    static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
+    static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
+
+    /** 
+     * Flag set when no more streams to be opened on this connection. Two cases where it is used.
+     *
+     * 1. two connections to the same server were opened concurrently, in which case one of them will be put in the cache, 
+     *    and the second will expire when all its opened streams (which usually should be a single client stream + possibly 
+     *    some additional push-promise server streams) complete.
+     * 2. A cached connection reaches its maximum number of streams (~ 2^31-1) either server / or client allocated, 
+     *    in which case it will be taken out of the cache - allowing a new connection to replace it. It will expire when all 
+     *    its still open streams (which could be many) eventually complete.
+     */
+    private boolean finalStream;
 
     /*
      * ByteBuffer pooling strategy for HTTP/2 protocol.
@@ -236,6 +249,12 @@
     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
     private int nextstreamid;
     private int nextPushStream = 2;
+    // actual stream ids are not allocated until the Headers frame is ready
+    // to be sent. The following two fields are updated as soon as a stream
+    // is created and assigned to a connection. They are checked before
+    // assigning a stream to a connection.
+    private int lastReservedClientStreamid = 1;
+    private int lastReservedServerStreamid = 0;
     private final Encoder hpackOut;
     private final Decoder hpackIn;
     final SettingsFrame clientSettings;
@@ -381,6 +400,29 @@
         return client2.client();
     }
 
+    // call these before assigning a request/stream to a connection
+    // if false returned then a new Http2Connection is required
+    // if true, the the stream may be assigned to this connection
+    synchronized boolean reserveStream(boolean clientInitiated) {
+        if (finalStream) {
+            return false;
+        }
+        if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
+            setFinalStream();
+            client2.deleteConnection(this);
+            return false;
+        } else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
+            setFinalStream();
+            client2.deleteConnection(this);
+            return false;
+        }
+        if (clientInitiated)
+            lastReservedClientStreamid+=2;
+        else
+            lastReservedServerStreamid+=2;
+        return true;
+    }
+
     /**
      * Throws an IOException if h2 was not negotiated
      */
@@ -430,12 +472,16 @@
                 .thenCompose(checkAlpnCF);
     }
 
-    synchronized boolean singleStream() {
-        return singleStream;
+    synchronized boolean finalStream() {
+        return finalStream;
     }
 
-    synchronized void setSingleStream(boolean use) {
-        singleStream = use;
+    /**
+     * Mark this connection so no more streams created on it and it will close when
+     * all are complete.
+     */
+    synchronized void setFinalStream() {
+        finalStream = true;
     }
 
     static String keyFor(HttpConnection connection) {
@@ -709,6 +755,9 @@
         if (promisedStreamid != nextPushStream) {
             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
             return;
+        } else if (!reserveStream(false)) {
+            resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM);
+            return;
         } else {
             nextPushStream += 2;
         }
@@ -768,7 +817,7 @@
             // corresponding entry in the window controller.
             windowController.removeStream(streamid);
         }
-        if (singleStream() && streams.isEmpty()) {
+        if (finalStream() && streams.isEmpty()) {
             // should be only 1 stream, but there might be more if server push
             close();
         }
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Tue May 01 19:14:46 2018 +0100
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Tue May 01 22:37:29 2018 +0100
@@ -67,11 +67,11 @@
     }
 
     public Http2TestServer(String serverName, boolean secure, int port) throws Exception {
-        this(serverName, secure, port, getDefaultExecutor(), null);
+        this(serverName, secure, port, getDefaultExecutor(), 50, null);
     }
 
     public Http2TestServer(boolean secure, int port) throws Exception {
-        this(null, secure, port, getDefaultExecutor(), null);
+        this(null, secure, port, getDefaultExecutor(), 50, null);
     }
 
     public InetSocketAddress getAddress() {
@@ -85,19 +85,29 @@
 
     public Http2TestServer(boolean secure,
                            SSLContext context) throws Exception {
-        this(null, secure, 0, null, context);
+        this(null, secure, 0, null, 50, context);
     }
 
     public Http2TestServer(String serverName, boolean secure,
                            SSLContext context) throws Exception {
-        this(serverName, secure, 0, null, context);
+        this(serverName, secure, 0, null, 50, context);
     }
 
     public Http2TestServer(boolean secure,
                            int port,
                            ExecutorService exec,
                            SSLContext context) throws Exception {
-        this(null, secure, port, exec, context);
+        this(null, secure, port, exec, 50, context);
+    }
+
+    public Http2TestServer(String serverName,
+                           boolean secure,
+                           int port,
+                           ExecutorService exec,
+                           SSLContext context)
+        throws Exception
+    {
+        this(serverName, secure, port, exec, 50, context);
     }
 
     /**
@@ -109,20 +119,22 @@
      * @param secure https or http
      * @param port listen port
      * @param exec executor service (cached thread pool is used if null)
+     * @param backlog the server socket backlog
      * @param context the SSLContext used when secure is true
      */
     public Http2TestServer(String serverName,
                            boolean secure,
                            int port,
                            ExecutorService exec,
+                           int backlog,
                            SSLContext context)
         throws Exception
     {
         this.serverName = serverName;
         if (secure) {
-            server = initSecure(port);
+            server = initSecure(port, backlog);
         } else {
-            server = initPlaintext(port);
+            server = initPlaintext(port, backlog);
         }
         this.secure = secure;
         this.exec = exec == null ? getDefaultExecutor() : exec;
@@ -171,10 +183,10 @@
         return handler;
     }
 
-    final ServerSocket initPlaintext(int port) throws Exception {
+    final ServerSocket initPlaintext(int port, int backlog) throws Exception {
         ServerSocket ss = new ServerSocket();
         ss.setReuseAddress(false);
-        ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+        ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), backlog);
         return ss;
     }
 
@@ -192,7 +204,7 @@
     }
 
 
-    final ServerSocket initSecure(int port) throws Exception {
+    final ServerSocket initSecure(int port, int backlog) throws Exception {
         ServerSocketFactory fac;
         if (sslContext != null) {
             fac = sslContext.getServerSocketFactory();
@@ -201,7 +213,7 @@
         }
         SSLServerSocket se = (SSLServerSocket) fac.createServerSocket();
         se.setReuseAddress(false);
-        se.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+        se.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), backlog);
         SSLParameters sslp = se.getSSLParameters();
         sslp.setApplicationProtocols(new String[]{"h2"});
         sslp.setEndpointIdentificationAlgorithm("HTTPS");