--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Fri Nov 03 10:01:08 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Wed Dec 06 11:11:59 2017 -0800
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,17 +25,18 @@
package jdk.incubator.http;
-import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.CompletableFuture;
+import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.frame.SettingsFrame;
import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
@@ -49,6 +50,10 @@
*/
class Http2ClientImpl {
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final static System.Logger debug =
+ Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG);
+
private final HttpClientImpl client;
Http2ClientImpl(HttpClientImpl client) {
@@ -59,13 +64,26 @@
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
+ private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
+ Collections.synchronizedMap(new HashMap<>());
- boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
- return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+ private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
+ synchronized (waiting) {
+ Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
+ if (waiters == null) {
+ waiters = new HashSet<>();
+ waiting.put(key, waiters);
+ }
+ waiters.add(cf);
+ }
}
+// boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
+// return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+// }
+
/**
- * If a https request then blocks and waits until a connection is opened.
+ * If a https request then async waits until a connection is opened.
* Returns null if the request is 'http' as a different (upgrade)
* mechanism is used.
*
@@ -78,50 +96,61 @@
* In latter case, when the Http2Connection is connected, putConnection() must
* be called to store it.
*/
- Http2Connection getConnectionFor(HttpRequestImpl req)
- throws IOException, InterruptedException {
+ CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
URI uri = req.uri();
- InetSocketAddress proxy = req.proxy(client);
+ InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
- Http2Connection connection = connections.get(key);
- if (connection != null) { // fast path if connection already exists
- return connection;
- }
+
synchronized (opening) {
- while ((connection = connections.get(key)) == null) {
- if (!req.secure()) {
- return null;
- }
- if (!opening.contains(key)) {
- opening.add(key);
- break;
- } else {
- opening.wait();
- }
+ Http2Connection connection = connections.get(key);
+ if (connection != null) { // fast path if connection already exists
+ return CompletableFuture.completedFuture(connection);
+ }
+
+ if (!req.secure()) {
+ return MinimalFuture.completedFuture(null);
+ }
+
+ if (!opening.contains(key)) {
+ debug.log(Level.DEBUG, "Opening: %s", key);
+ opening.add(key);
+ } else {
+ CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
+ addToWaiting(key, cf);
+ return cf;
}
}
- if (connection != null) {
- return connection;
- }
- // we are opening the connection here blocking until it is done.
- try {
- connection = new Http2Connection(req, this);
- } catch (Throwable t) {
- synchronized (opening) {
- opening.remove(key);
- opening.notifyAll();
- }
- throw t;
- }
- synchronized (opening) {
- connections.put(key, connection);
- opening.remove(key);
- opening.notifyAll();
- }
- return connection;
+ return Http2Connection
+ .createAsync(req, this)
+ .whenComplete((conn, t) -> {
+ debug.log(Level.DEBUG,
+ "waking up dependents with created connection");
+ synchronized (opening) {
+ Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
+ debug.log(Level.DEBUG, "Opening completed: %s", key);
+ opening.remove(key);
+ if (t == null && conn != null)
+ putConnection(conn);
+ final Throwable cause = Utils.getCompletionCause(t);
+ if (waiters == null) {
+ debug.log(Level.DEBUG, "no dependent to wake up");
+ return;
+ } else if (cause instanceof Http2Connection.ALPNException) {
+ waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
+ client.theExecutor()));
+ } else if (cause != null) {
+ debug.log(Level.DEBUG,
+ () -> "waking up dependants: failed: " + cause);
+ waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
+ } else {
+ debug.log(Level.DEBUG, "waking up dependants: succeeded");
+ waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
+ client.theExecutor()));
+ }
+ }
+ });
}
-
/*
* TODO: If there isn't a connection to the same destination, then
* store it. If there is already a connection, then close it
@@ -134,6 +163,16 @@
connections.remove(c.key());
}
+ void stop() {
+ debug.log(Level.DEBUG, "stopping");
+ connections.values().forEach(this::close);
+ connections.clear();
+ }
+
+ private void close(Http2Connection h2c) {
+ try { h2c.close(); } catch (Throwable t) {}
+ }
+
HttpClientImpl client() {
return client;
}