Merge
authormichaelm
Tue, 19 Dec 2017 16:12:55 +0000
changeset 48377 4ffa14468cd1
parent 48376 41ae5c69b09c (diff)
parent 48375 5471388067cf (current diff)
child 48378 e8e8db4f8194
Merge
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Tue Dec 19 16:12:55 2017 +0000
@@ -373,13 +373,16 @@
                                                  client.client2(),
                                                  this, e::drainLeftOverBytes)
                         .thenCompose((Http2Connection c) -> {
-                            c.putConnection();
+                            boolean cached = c.offerConnection();
                             Stream<T> s = c.getStream(1);
+
                             if (s == null) {
                                 // s can be null if an exception occurred
                                 // asynchronously while sending the preface.
                                 Throwable t = c.getRecordedCause();
                                 if (t != null) {
+                                    if (!cached)
+                                        c.close();
                                     return MinimalFuture.failedFuture(
                                             new IOException("Can't get stream 1: " + t, t));
                                 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Tue Dec 19 16:12:55 2017 +0000
@@ -98,6 +98,7 @@
                        HttpConnection connection)
     {
         DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
+        boolean secure = exchange.request().secure();
         if (t != null) {
             DEBUG_LOGGER.log(Level.DEBUG,
                              "handling HTTP/2 connection creation failed: %s",
@@ -116,6 +117,12 @@
                 return CompletableFuture.failedFuture(t);
             }
         }
+        if (secure && c== null) {
+            DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 ");
+            CompletableFuture<? extends ExchangeImpl<U>> ex =
+                    createHttp1Exchange(exchange, null);
+            return ex;
+        }
         if (c == null) {
             // no existing connection. Send request with HTTP 1 and then
             // upgrade if successful
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Tue Dec 19 16:12:55 2017 +0000
@@ -65,96 +65,78 @@
     /* Map key is "scheme:host:port" */
     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
 
-    private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
-    private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
-    Collections.synchronizedMap(new HashMap<>());
-
-    private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
-        synchronized (waiting) {
-            Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
-            if (waiters == null) {
-                waiters = new HashSet<>();
-                waiting.put(key, waiters);
-            }
-            waiters.add(cf);
-        }
-    }
+    private final Set<String> failures = Collections.synchronizedSet(new HashSet<>());
 
     /**
-     * If a https request then async waits until a connection is opened.
-     * Returns null if the request is 'http' as a different (upgrade)
-     * mechanism is used.
+     * When HTTP/2 requested only. The following describes the aggregate behavior including the
+     * calling code. In all cases, the HTTP2 connection cache
+     * is checked first for a suitable connection and that is returned if available.
+     * If not, a new connection is opened, except in https case when a previous negotiate failed.
+     * In that case, we want to continue using http/1.1. When a connection is to be opened and
+     * if multiple requests are sent in parallel then each will open a new connection.
+     *
+     * If negotiation/upgrade succeeds then
+     * one connection will be put in the cache and the others will be closed
+     * after the initial request completes (not strictly necessary for h2, only for h2c)
      *
-     * Only one connection per destination is created. Blocks when opening
-     * connection, or when waiting for connection to be opened.
-     * First thread opens the connection and notifies the others when done.
+     * If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
+     * and will be used and cached in the http/1 cache. Note, this method handles the
+     * https failure case only (by completing the CF with an ALPN exception, handled externally)
+     * The h2c upgrade is handled externally also.
      *
-     * If the request is secure (https) then we open the connection here.
-     * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
-     * In latter case, when the Http2Connection is connected, putConnection() must
-     * be called to store it.
+     * Specific CF behavior of this method.
+     * 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
+     * 2. completes with other exception: failure not recorded. Caller must handle
+     * 3. completes normally with null: no connection in cache for h2c or h2 failed previously
+     * 4. completes normally with connection: h2 or h2c connection in cache. Use it.
      */
     CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
         URI uri = req.uri();
         InetSocketAddress proxy = req.proxy();
         String key = Http2Connection.keyFor(uri, proxy);
 
-        synchronized (opening) {
+        synchronized (this) {
             Http2Connection connection = connections.get(key);
             if (connection != null) { // fast path if connection already exists
                 return CompletableFuture.completedFuture(connection);
             }
 
-            if (!req.secure()) {
+            if (!req.secure() || failures.contains(key)) {
+                // secure: negotiate failed before. Use http/1.1
+                // !secure: no connection available in cache. Attempt upgrade
                 return MinimalFuture.completedFuture(null);
             }
-
-            if (!opening.contains(key)) {
-                debug.log(Level.DEBUG, "Opening: %s", key);
-                opening.add(key);
-            } else {
-                CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
-                addToWaiting(key, cf);
-                return cf;
-            }
         }
         return Http2Connection
                 .createAsync(req, this)
                 .whenComplete((conn, t) -> {
-                    debug.log(Level.DEBUG,
-                            "waking up dependents with created connection");
-                    synchronized (opening) {
-                        Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
-                        debug.log(Level.DEBUG, "Opening completed: %s", key);
-                        opening.remove(key);
-                        if (t == null && conn != null)
-                            putConnection(conn);
-                        final Throwable cause = Utils.getCompletionCause(t);
-                        if (waiters == null) {
-                            debug.log(Level.DEBUG, "no dependent to wake up");
-                            return;
-                        } else if (cause instanceof Http2Connection.ALPNException) {
-                            waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
-                                    client.theExecutor()));
-                        } else if (cause != null) {
-                            debug.log(Level.DEBUG,
-                                    () -> "waking up dependants: failed: " + cause);
-                            waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
-                        } else  {
-                            debug.log(Level.DEBUG, "waking up dependants: succeeded");
-                            waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
-                                    client.theExecutor()));
+                    synchronized (Http2ClientImpl.this) {
+                        if (conn != null) {
+                            offerConnection(conn);
+                        } else {
+                            Throwable cause = Utils.getCompletionCause(t);
+                            if (cause instanceof Http2Connection.ALPNException)
+                                failures.add(key);
                         }
                     }
                 });
     }
 
     /*
-     * TODO: If there isn't a connection to the same destination, then
-     * store it. If there is already a connection, then close it
+     * Cache the given connection, if no connection to the same
+     * destination exists. If one exists, then we let the initial stream
+     * complete but allow it to close itself upon completion.
+     * This situation should not arise with https because the request
+     * has not been sent as part of the initial alpn negotiation
      */
-    void putConnection(Http2Connection c) {
-        connections.put(c.key(), c);
+    boolean offerConnection(Http2Connection c) {
+        String key = c.key();
+        Http2Connection c1 = connections.putIfAbsent(key, c);
+        if (c1 != null) {
+            c.setSingleStream(true);
+            return false;
+        }
+        return true;
     }
 
     void deleteConnection(Http2Connection c) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Tue Dec 19 16:12:55 2017 +0000
@@ -116,6 +116,8 @@
                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 
+    private boolean singleStream; // used only for stream 1, then closed
+
     /*
      *  ByteBuffer pooling strategy for HTTP/2 protocol:
      *
@@ -202,7 +204,6 @@
                 prefaceSent = true;
             }
         }
-
     }
 
     volatile boolean closed;
@@ -397,6 +398,14 @@
         return aconn.getALPN().thenCompose(checkAlpnCF);
     }
 
+    synchronized boolean singleStream() {
+        return singleStream;
+    }
+
+    synchronized void setSingleStream(boolean use) {
+        singleStream = use;
+    }
+
     static String keyFor(HttpConnection connection) {
         boolean isProxy = connection.isProxied();
         boolean isSecure = connection.isSecure();
@@ -429,6 +438,10 @@
     // P indicates proxy
     // Eg: "S:H:foo.com:80"
     static String keyString(boolean secure, boolean proxy, String host, int port) {
+        if (secure && port == -1)
+            port = 443;
+        else if (!secure && port == -1)
+            port = 80;
         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
     }
 
@@ -436,8 +449,8 @@
         return this.key;
     }
 
-    void putConnection() {
-        client2.putConnection(this);
+    boolean offerConnection() {
+        return client2.offerConnection(this);
     }
 
     private HttpPublisher publisher() {
@@ -464,6 +477,7 @@
     }
 
     void close() {
+        Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
         // TODO: set last stream. For now zero ok.
         sendFrame(f);
@@ -680,7 +694,12 @@
             // corresponding entry in the window controller.
             windowController.removeStream(streamid);
         }
+        if (singleStream() && streams.isEmpty()) {
+            // should be only 1 stream, but there might be more if server push
+            close();
+        }
     }
+
     /**
      * Increments this connection's send Window by the amount in the given frame.
      */
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Tue Dec 19 16:12:55 2017 +0000
@@ -84,7 +84,7 @@
                 boolean finished = chan.finishConnect();
                 assert finished : "Expected channel to be connected";
                 debug.log(Level.DEBUG,
-                          "ConnectEvent: connect finished: %s", finished);
+                          "ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress());
                 connected = true;
                 // complete async since the event runs on the SelectorManager thread
                 cf.completeAsync(() -> null, client().theExecutor());
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Tue Dec 19 16:12:55 2017 +0000
@@ -401,8 +401,8 @@
             pusher.onSubscribe(this.sub = sub);
             try {
                 if (contentLength == 0) {
+                    onFinished.run();
                     pusher.onComplete();
-                    onFinished.run();
                     onComplete.accept(null);
                 }
             } catch (Throwable t) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Tue Dec 19 21:35:30 2017 +0530
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Tue Dec 19 16:12:55 2017 +0000
@@ -170,7 +170,7 @@
         return new ServerSocket(port);
     }
 
-    public void stop() {
+    public synchronized void stop() {
         // TODO: clean shutdown GoAway
         stopping = true;
         System.err.printf("Server stopping %d connections\n", connections.size());
@@ -205,6 +205,15 @@
         return serverName;
     }
 
+    private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+        if (!stopping)
+            connections.put(addr, c);
+    }
+
+    private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+        connections.remove(addr, c);
+    }
+
     /**
      * Starts a thread which waits for incoming connections.
      */
@@ -216,7 +225,7 @@
                     InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
                     Http2TestServerConnection c =
                             new Http2TestServerConnection(this, socket, exchangeSupplier);
-                    connections.put(addr, c);
+                    putConnection(addr, c);
                     try {
                         c.run();
                     } catch (Throwable e) {
@@ -224,7 +233,7 @@
                         // the connection might not have been closed
                         // and if so then the client might wait
                         // forever.
-                        connections.remove(addr, c);
+                        removeConnection(addr, c);
                         c.close(ErrorFrame.PROTOCOL_ERROR);
                         System.err.println("TestServer: start exception: " + e);
                         //throw e;