http-client-branch: Fixed 8192966: HttpClient should reuse TCP connection for h2c connections http-client-branch
authormichaelm
Thu, 14 Dec 2017 18:41:57 +0000
branchhttp-client-branch
changeset 55983 e4a1f0c9d4c6
parent 55982 b6ff245c0db6
child 55988 7f1e0cf933a6
http-client-branch: Fixed 8192966: HttpClient should reuse TCP connection for h2c connections
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
test/jdk/java/net/httpclient/http2/server/Http2TestServer.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Thu Dec 14 18:41:57 2017 +0000
@@ -387,8 +387,6 @@
                                             new IOException("Can't get stream 1: " + t, t));
                                 }
                             }
-                            if (!cached)
-                                s.closeConnectionOnCompletion();
                             exchImpl.released();
                             Throwable t;
                             // There's a race condition window where an external
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Thu Dec 14 18:41:57 2017 +0000
@@ -103,6 +103,7 @@
                              "handling HTTP/2 connection creation failed: %s",
                              (Object)t);
             t = Utils.getCompletionCause(t);
+            boolean secure = exchange.request().secure();
             if (t instanceof Http2Connection.ALPNException) {
                 Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
                 AbstractAsyncSSLConnection as = ee.getConnection();
@@ -110,6 +111,11 @@
                 CompletableFuture<? extends ExchangeImpl<U>> ex =
                         createHttp1Exchange(exchange, as);
                 return ex;
+            } else if (secure && c== null) {
+                DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 ");
+                CompletableFuture<? extends ExchangeImpl<U>> ex =
+                        createHttp1Exchange(exchange, null);
+                return ex;
             } else {
                 DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
                                   + "with unexpected exception: %s", (Object)t);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Thu Dec 14 18:41:57 2017 +0000
@@ -65,85 +65,58 @@
     /* Map key is "scheme:host:port" */
     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
 
-    private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
-    private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
-    Collections.synchronizedMap(new HashMap<>());
-
-    private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
-        synchronized (waiting) {
-            Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
-            if (waiters == null) {
-                waiters = new HashSet<>();
-                waiting.put(key, waiters);
-            }
-            waiters.add(cf);
-        }
-    }
+    private final Set<String> failures = Collections.synchronizedSet(new HashSet<>());
 
     /**
-     * If a https request then async waits until a connection is opened.
-     * Returns null if the request is 'http' as a different (upgrade)
-     * mechanism is used.
+     * When HTTP/2 requested only. The following describes the aggregate behavior including the
+     * calling code. In all cases, the HTTP2 connection cache
+     * is checked first for a suitable connection and that is returned if available.
+     * If not, a new connection is opened, except in https case when a previous negotiate failed.
+     * In that case, we want to continue using http/1.1. When a connection is to be opened and
+     * if multiple requests are sent in parallel then each will open a new connection.
+     *
+     * If negotiation/upgrade succeeds then
+     * one connection will be put in the cache and the others will be closed
+     * after the initial request completes (not strictly necessary for h2, only for h2c)
      *
-     * Only one connection per destination is created. Blocks when opening
-     * connection, or when waiting for connection to be opened.
-     * First thread opens the connection and notifies the others when done.
+     * If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
+     * and will be used and cached in the http/1 cache. Note, this method handles the
+     * https failure case only (by completing the CF with an ALPN exception, handled externally)
+     * The h2c upgrade is handled externally also.
      *
-     * If the request is secure (https) then we open the connection here.
-     * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
-     * In latter case, when the Http2Connection is connected, offerConnection() must
-     * be called to store it.
+     * Specific CF behavior of this method.
+     * 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
+     * 2. completes with other exception: failure not recorded. Caller must handle
+     * 3. completes normally with null: no connection in cache for h2c or h2 failed previously
+     * 4. completes normally with connection: h2 or h2c connection in cache. Use it.
      */
     CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
         URI uri = req.uri();
         InetSocketAddress proxy = req.proxy();
         String key = Http2Connection.keyFor(uri, proxy);
 
-        synchronized (opening) {
+        synchronized (this) {
             Http2Connection connection = connections.get(key);
             if (connection != null) { // fast path if connection already exists
                 return CompletableFuture.completedFuture(connection);
             }
 
-            if (!req.secure()) {
+            if (!req.secure() || failures.contains(key)) {
+                // secure: negotiate failed before. Use http/1.1
+                // !secure: no connection available in cache. Attempt upgrade
                 return MinimalFuture.completedFuture(null);
             }
-
-            if (!opening.contains(key)) {
-                debug.log(Level.DEBUG, "Opening: %s", key);
-                opening.add(key);
-            } else {
-                CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
-                addToWaiting(key, cf);
-                return cf;
-            }
         }
         return Http2Connection
                 .createAsync(req, this)
                 .whenComplete((conn, t) -> {
-                    debug.log(Level.DEBUG,
-                            "waking up dependents with created connection");
-                    synchronized (opening) {
-                        Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
-                        debug.log(Level.DEBUG, "Opening completed: %s", key);
-                        opening.remove(key);
-                        if (t == null && conn != null)
+                    synchronized (Http2ClientImpl.this) {
+                        if (conn != null) {
                             offerConnection(conn);
-                        final Throwable cause = Utils.getCompletionCause(t);
-                        if (waiters == null) {
-                            debug.log(Level.DEBUG, "no dependent to wake up");
-                            return;
-                        } else if (cause instanceof Http2Connection.ALPNException) {
-                            waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
-                                    client.theExecutor()));
-                        } else if (cause != null) {
-                            debug.log(Level.DEBUG,
-                                    () -> "waking up dependants: failed: " + cause);
-                            waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
-                        } else  {
-                            debug.log(Level.DEBUG, "waking up dependants: succeeded");
-                            waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
-                                    client.theExecutor()));
+                        } else {
+                            Throwable cause = Utils.getCompletionCause(t);
+                            if (cause instanceof Http2Connection.ALPNException)
+                                failures.add(key);
                         }
                     }
                 });
@@ -159,7 +132,11 @@
     boolean offerConnection(Http2Connection c) {
         String key = c.key();
         Http2Connection c1 = connections.putIfAbsent(key, c);
-        return c1 == null;
+        if (c1 != null) {
+            c.setSingleStream(true);
+            return false;
+        }
+        return true;
     }
 
     void deleteConnection(Http2Connection c) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Thu Dec 14 18:41:57 2017 +0000
@@ -116,6 +116,8 @@
                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 
+    private boolean singleStream; // used only for stream 1, then closed
+
     /*
      *  ByteBuffer pooling strategy for HTTP/2 protocol:
      *
@@ -202,7 +204,6 @@
                 prefaceSent = true;
             }
         }
-
     }
 
     volatile boolean closed;
@@ -397,6 +398,14 @@
         return aconn.getALPN().thenCompose(checkAlpnCF);
     }
 
+    synchronized boolean singleStream() {
+        return singleStream;
+    }
+
+    synchronized void setSingleStream(boolean use) {
+        singleStream = use;
+    }
+
     static String keyFor(HttpConnection connection) {
         boolean isProxy = connection.isProxied();
         boolean isSecure = connection.isSecure();
@@ -468,6 +477,7 @@
     }
 
     void close() {
+        Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
         // TODO: set last stream. For now zero ok.
         sendFrame(f);
@@ -684,7 +694,12 @@
             // corresponding entry in the window controller.
             windowController.removeStream(streamid);
         }
+        if (singleStream() && streams.isEmpty()) {
+            // should be only 1 stream, but there might be more if server push
+            close();
+        }
     }
+
     /**
      * Increments this connection's send Window by the amount in the given frame.
      */
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Thu Dec 14 18:41:57 2017 +0000
@@ -401,8 +401,8 @@
             pusher.onSubscribe(this.sub = sub);
             try {
                 if (contentLength == 0) {
+                    onFinished.run();
                     pusher.onComplete();
-                    onFinished.run();
                     onComplete.accept(null);
                 }
             } catch (Throwable t) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Thu Dec 14 18:41:57 2017 +0000
@@ -109,7 +109,6 @@
     final Http2Connection connection;
     final HttpRequestImpl request;
     final DecodingCallback rspHeadersConsumer;
-    volatile boolean closeConnectionOnCompletion;
     HttpHeadersImpl responseHeaders;
     final HttpHeadersImpl requestPseudoHeaders;
     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
@@ -197,17 +196,6 @@
         }
     }
 
-    // call this anywhere the stream is terminated
-    void checkConnectionClosure() {
-        if (closeConnectionOnCompletion) {
-            connection.close();
-        }
-    }
-
-    void closeConnectionOnCompletion() {
-        closeConnectionOnCompletion = true;
-    }
-
     // Callback invoked after the Response BodySubscriber has consumed the
     // buffers contained in a DataFrame.
     // Returns true if END_STREAM is reached, false otherwise.
@@ -948,7 +936,6 @@
         } catch (IOException ex) {
             Log.logError(ex);
         }
-        checkConnectionClosure();
     }
 
     // This method doesn't send any frame
@@ -961,7 +948,6 @@
         Log.logTrace("Closing stream {0}", streamid);
         connection.closeStream(streamid);
         Log.logTrace("Stream {0} closed", streamid);
-        checkConnectionClosure();
     }
 
     static class PushedStream<U,T> extends Stream<T> {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Thu Dec 14 12:28:32 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Thu Dec 14 18:41:57 2017 +0000
@@ -170,7 +170,7 @@
         return new ServerSocket(port);
     }
 
-    public void stop() {
+    public synchronized void stop() {
         // TODO: clean shutdown GoAway
         stopping = true;
         System.err.printf("Server stopping %d connections\n", connections.size());
@@ -205,6 +205,15 @@
         return serverName;
     }
 
+    private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+        if (!stopping)
+            connections.put(addr, c);
+    }
+
+    private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+        connections.remove(addr, c);
+    }
+
     /**
      * Starts a thread which waits for incoming connections.
      */
@@ -216,7 +225,7 @@
                     InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
                     Http2TestServerConnection c =
                             new Http2TestServerConnection(this, socket, exchangeSupplier);
-                    connections.put(addr, c);
+                    putConnection(addr, c);
                     try {
                         c.run();
                     } catch (Throwable e) {
@@ -224,7 +233,7 @@
                         // the connection might not have been closed
                         // and if so then the client might wait
                         // forever.
-                        connections.remove(addr, c);
+                        removeConnection(addr, c);
                         c.close(ErrorFrame.PROTOCOL_ERROR);
                         System.err.println("TestServer: start exception: " + e);
                         //throw e;