src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Sun Nov 05 17:32:13 2017 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,17 +25,18 @@
 
 package jdk.incubator.http;
 
-import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.CompletableFuture;
+import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.frame.SettingsFrame;
 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
@@ -49,6 +50,10 @@
  */
 class Http2ClientImpl {
 
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final static System.Logger debug =
+            Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG);
+
     private final HttpClientImpl client;
 
     Http2ClientImpl(HttpClientImpl client) {
@@ -59,13 +64,26 @@
     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
 
     private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
+    private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
+    Collections.synchronizedMap(new HashMap<>());
+
+    private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
+        synchronized (waiting) {
+            Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
+            if (waiters == null) {
+                waiters = new HashSet<>();
+                waiting.put(key, waiters);
+            }
+            waiters.add(cf);
+        }
+    }
 
     boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
         return connections.containsKey(Http2Connection.keyFor(uri,proxy));
     }
 
     /**
-     * If a https request then blocks and waits until a connection is opened.
+     * If a https request then async waits until a connection is opened.
      * Returns null if the request is 'http' as a different (upgrade)
      * mechanism is used.
      *
@@ -78,50 +96,59 @@
      * In latter case, when the Http2Connection is connected, putConnection() must
      * be called to store it.
      */
-    Http2Connection getConnectionFor(HttpRequestImpl req)
-            throws IOException, InterruptedException {
+    CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
         URI uri = req.uri();
         InetSocketAddress proxy = req.proxy(client);
         String key = Http2Connection.keyFor(uri, proxy);
-        Http2Connection connection = connections.get(key);
-        if (connection != null) { // fast path if connection already exists
-            return connection;
-        }
+
         synchronized (opening) {
-            while ((connection = connections.get(key)) == null) {
-                if (!req.secure()) {
-                    return null;
-                }
-                if (!opening.contains(key)) {
-                    opening.add(key);
-                    break;
-                } else {
-                    opening.wait();
-                }
+            Http2Connection connection = connections.get(key);
+            if (connection != null) { // fast path if connection already exists
+                return CompletableFuture.completedFuture(connection);
+            }
+
+            if (!req.secure()) {
+                return MinimalFuture.completedFuture(null);
+            }
+
+            if (!opening.contains(key)) {
+                debug.log(Level.DEBUG, "Opening: %s", key);
+                opening.add(key);
+            } else {
+                CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
+                addToWaiting(key, cf);
+                return cf;
             }
         }
-        if (connection != null) {
-            return connection;
-        }
-        // we are opening the connection here blocking until it is done.
-        try {
-            connection = new Http2Connection(req, this);
-        } catch (Throwable t) {
-            synchronized (opening) {
-                opening.remove(key);
-                opening.notifyAll();
-            }
-            throw t;
-        }
-        synchronized (opening) {
-            connections.put(key, connection);
-            opening.remove(key);
-            opening.notifyAll();
-        }
-        return connection;
+        return Http2Connection
+                .createAsync(req, this)
+                .whenComplete((conn, t) -> {
+                    debug.log(Level.DEBUG,
+                            "waking up dependents with created connection");
+                    synchronized (opening) {
+                        Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
+                        debug.log(Level.DEBUG, "Opening completed: %s", key);
+                        opening.remove(key);
+                        final Throwable cause = Utils.getCompletionCause(t);
+                        if (waiters == null) {
+                            debug.log(Level.DEBUG, "no dependent to wake up");
+                            return;
+                        } else if (cause instanceof Http2Connection.ALPNException) {
+                            waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
+                                    client.theExecutor()));
+                        } else if (cause != null) {
+                            debug.log(Level.DEBUG,
+                                    () -> "waking up dependants: failed: " + cause);
+                            waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
+                        } else  {
+                            debug.log(Level.DEBUG, "waking up dependants: succeeded");
+                            waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
+                                    client.theExecutor()));
+                        }
+                    }
+                });
     }
 
-
     /*
      * TODO: If there isn't a connection to the same destination, then
      * store it. If there is already a connection, then close it
@@ -134,6 +161,16 @@
         connections.remove(c.key());
     }
 
+    void stop() {
+        debug.log(Level.DEBUG, "stopping");
+        connections.values().stream().forEach(this::close);
+        connections.clear();
+    }
+
+    private void close(Http2Connection h2c) {
+        try { h2c.close(); } catch (Throwable t) {}
+    }
+
     HttpClientImpl client() {
         return client;
     }