http-client-branch: Fixed 8192966: HttpClient should reuse TCP connection for h2c connections
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Thu Dec 14 18:41:57 2017 +0000
@@ -387,8 +387,6 @@
new IOException("Can't get stream 1: " + t, t));
}
}
- if (!cached)
- s.closeConnectionOnCompletion();
exchImpl.released();
Throwable t;
// There's a race condition window where an external
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Thu Dec 14 18:41:57 2017 +0000
@@ -103,6 +103,7 @@
"handling HTTP/2 connection creation failed: %s",
(Object)t);
t = Utils.getCompletionCause(t);
+ boolean secure = exchange.request().secure();
if (t instanceof Http2Connection.ALPNException) {
Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
AbstractAsyncSSLConnection as = ee.getConnection();
@@ -110,6 +111,11 @@
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, as);
return ex;
+ } else if (secure && c== null) {
+ DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 ");
+ CompletableFuture<? extends ExchangeImpl<U>> ex =
+ createHttp1Exchange(exchange, null);
+ return ex;
} else {
DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
+ "with unexpected exception: %s", (Object)t);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Thu Dec 14 18:41:57 2017 +0000
@@ -65,85 +65,58 @@
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
- private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
- private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
- Collections.synchronizedMap(new HashMap<>());
-
- private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
- synchronized (waiting) {
- Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
- if (waiters == null) {
- waiters = new HashSet<>();
- waiting.put(key, waiters);
- }
- waiters.add(cf);
- }
- }
+ private final Set<String> failures = Collections.synchronizedSet(new HashSet<>());
/**
- * If a https request then async waits until a connection is opened.
- * Returns null if the request is 'http' as a different (upgrade)
- * mechanism is used.
+ * When HTTP/2 requested only. The following describes the aggregate behavior including the
+ * calling code. In all cases, the HTTP2 connection cache
+ * is checked first for a suitable connection and that is returned if available.
+ * If not, a new connection is opened, except in https case when a previous negotiate failed.
+ * In that case, we want to continue using http/1.1. When a connection is to be opened and
+ * if multiple requests are sent in parallel then each will open a new connection.
+ *
+ * If negotiation/upgrade succeeds then
+ * one connection will be put in the cache and the others will be closed
+ * after the initial request completes (not strictly necessary for h2, only for h2c)
*
- * Only one connection per destination is created. Blocks when opening
- * connection, or when waiting for connection to be opened.
- * First thread opens the connection and notifies the others when done.
+ * If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
+ * and will be used and cached in the http/1 cache. Note, this method handles the
+ * https failure case only (by completing the CF with an ALPN exception, handled externally)
+ * The h2c upgrade is handled externally also.
*
- * If the request is secure (https) then we open the connection here.
- * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
- * In latter case, when the Http2Connection is connected, offerConnection() must
- * be called to store it.
+ * Specific CF behavior of this method.
+ * 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
+ * 2. completes with other exception: failure not recorded. Caller must handle
+ * 3. completes normally with null: no connection in cache for h2c or h2 failed previously
+ * 4. completes normally with connection: h2 or h2c connection in cache. Use it.
*/
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
- synchronized (opening) {
+ synchronized (this) {
Http2Connection connection = connections.get(key);
if (connection != null) { // fast path if connection already exists
return CompletableFuture.completedFuture(connection);
}
- if (!req.secure()) {
+ if (!req.secure() || failures.contains(key)) {
+ // secure: negotiate failed before. Use http/1.1
+ // !secure: no connection available in cache. Attempt upgrade
return MinimalFuture.completedFuture(null);
}
-
- if (!opening.contains(key)) {
- debug.log(Level.DEBUG, "Opening: %s", key);
- opening.add(key);
- } else {
- CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
- addToWaiting(key, cf);
- return cf;
- }
}
return Http2Connection
.createAsync(req, this)
.whenComplete((conn, t) -> {
- debug.log(Level.DEBUG,
- "waking up dependents with created connection");
- synchronized (opening) {
- Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
- debug.log(Level.DEBUG, "Opening completed: %s", key);
- opening.remove(key);
- if (t == null && conn != null)
+ synchronized (Http2ClientImpl.this) {
+ if (conn != null) {
offerConnection(conn);
- final Throwable cause = Utils.getCompletionCause(t);
- if (waiters == null) {
- debug.log(Level.DEBUG, "no dependent to wake up");
- return;
- } else if (cause instanceof Http2Connection.ALPNException) {
- waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
- client.theExecutor()));
- } else if (cause != null) {
- debug.log(Level.DEBUG,
- () -> "waking up dependants: failed: " + cause);
- waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
- } else {
- debug.log(Level.DEBUG, "waking up dependants: succeeded");
- waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
- client.theExecutor()));
+ } else {
+ Throwable cause = Utils.getCompletionCause(t);
+ if (cause instanceof Http2Connection.ALPNException)
+ failures.add(key);
}
}
});
@@ -159,7 +132,11 @@
boolean offerConnection(Http2Connection c) {
String key = c.key();
Http2Connection c1 = connections.putIfAbsent(key, c);
- return c1 == null;
+ if (c1 != null) {
+ c.setSingleStream(true);
+ return false;
+ }
+ return true;
}
void deleteConnection(Http2Connection c) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Thu Dec 14 18:41:57 2017 +0000
@@ -116,6 +116,8 @@
Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
+ private boolean singleStream; // used only for stream 1, then closed
+
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
@@ -202,7 +204,6 @@
prefaceSent = true;
}
}
-
}
volatile boolean closed;
@@ -397,6 +398,14 @@
return aconn.getALPN().thenCompose(checkAlpnCF);
}
+ synchronized boolean singleStream() {
+ return singleStream;
+ }
+
+ synchronized void setSingleStream(boolean use) {
+ singleStream = use;
+ }
+
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
@@ -468,6 +477,7 @@
}
void close() {
+ Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
@@ -684,7 +694,12 @@
// corresponding entry in the window controller.
windowController.removeStream(streamid);
}
+ if (singleStream() && streams.isEmpty()) {
+ // should be only 1 stream, but there might be more if server push
+ close();
+ }
}
+
/**
* Increments this connection's send Window by the amount in the given frame.
*/
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Thu Dec 14 18:41:57 2017 +0000
@@ -401,8 +401,8 @@
pusher.onSubscribe(this.sub = sub);
try {
if (contentLength == 0) {
+ onFinished.run();
pusher.onComplete();
- onFinished.run();
onComplete.accept(null);
}
} catch (Throwable t) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Thu Dec 14 12:28:32 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Thu Dec 14 18:41:57 2017 +0000
@@ -109,7 +109,6 @@
final Http2Connection connection;
final HttpRequestImpl request;
final DecodingCallback rspHeadersConsumer;
- volatile boolean closeConnectionOnCompletion;
HttpHeadersImpl responseHeaders;
final HttpHeadersImpl requestPseudoHeaders;
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
@@ -197,17 +196,6 @@
}
}
- // call this anywhere the stream is terminated
- void checkConnectionClosure() {
- if (closeConnectionOnCompletion) {
- connection.close();
- }
- }
-
- void closeConnectionOnCompletion() {
- closeConnectionOnCompletion = true;
- }
-
// Callback invoked after the Response BodySubscriber has consumed the
// buffers contained in a DataFrame.
// Returns true if END_STREAM is reached, false otherwise.
@@ -948,7 +936,6 @@
} catch (IOException ex) {
Log.logError(ex);
}
- checkConnectionClosure();
}
// This method doesn't send any frame
@@ -961,7 +948,6 @@
Log.logTrace("Closing stream {0}", streamid);
connection.closeStream(streamid);
Log.logTrace("Stream {0} closed", streamid);
- checkConnectionClosure();
}
static class PushedStream<U,T> extends Stream<T> {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Thu Dec 14 12:28:32 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Thu Dec 14 18:41:57 2017 +0000
@@ -170,7 +170,7 @@
return new ServerSocket(port);
}
- public void stop() {
+ public synchronized void stop() {
// TODO: clean shutdown GoAway
stopping = true;
System.err.printf("Server stopping %d connections\n", connections.size());
@@ -205,6 +205,15 @@
return serverName;
}
+ private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+ if (!stopping)
+ connections.put(addr, c);
+ }
+
+ private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+ connections.remove(addr, c);
+ }
+
/**
* Starts a thread which waits for incoming connections.
*/
@@ -216,7 +225,7 @@
InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
Http2TestServerConnection c =
new Http2TestServerConnection(this, socket, exchangeSupplier);
- connections.put(addr, c);
+ putConnection(addr, c);
try {
c.run();
} catch (Throwable e) {
@@ -224,7 +233,7 @@
// the connection might not have been closed
// and if so then the client might wait
// forever.
- connections.remove(addr, c);
+ removeConnection(addr, c);
c.close(ErrorFrame.PROTOCOL_ERROR);
System.err.println("TestServer: start exception: " + e);
//throw e;