--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Tue Dec 19 16:12:55 2017 +0000
@@ -373,13 +373,16 @@
client.client2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
- c.putConnection();
+ boolean cached = c.offerConnection();
Stream<T> s = c.getStream(1);
+
if (s == null) {
// s can be null if an exception occurred
// asynchronously while sending the preface.
Throwable t = c.getRecordedCause();
if (t != null) {
+ if (!cached)
+ c.close();
return MinimalFuture.failedFuture(
new IOException("Can't get stream 1: " + t, t));
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Tue Dec 19 16:12:55 2017 +0000
@@ -98,6 +98,7 @@
HttpConnection connection)
{
DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
+ boolean secure = exchange.request().secure();
if (t != null) {
DEBUG_LOGGER.log(Level.DEBUG,
"handling HTTP/2 connection creation failed: %s",
@@ -116,6 +117,12 @@
return CompletableFuture.failedFuture(t);
}
}
+ if (secure && c== null) {
+ DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 ");
+ CompletableFuture<? extends ExchangeImpl<U>> ex =
+ createHttp1Exchange(exchange, null);
+ return ex;
+ }
if (c == null) {
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Tue Dec 19 16:12:55 2017 +0000
@@ -65,96 +65,78 @@
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
- private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
- private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
- Collections.synchronizedMap(new HashMap<>());
-
- private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
- synchronized (waiting) {
- Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
- if (waiters == null) {
- waiters = new HashSet<>();
- waiting.put(key, waiters);
- }
- waiters.add(cf);
- }
- }
+ private final Set<String> failures = Collections.synchronizedSet(new HashSet<>());
/**
- * If a https request then async waits until a connection is opened.
- * Returns null if the request is 'http' as a different (upgrade)
- * mechanism is used.
+ * When HTTP/2 requested only. The following describes the aggregate behavior including the
+ * calling code. In all cases, the HTTP2 connection cache
+ * is checked first for a suitable connection and that is returned if available.
+ * If not, a new connection is opened, except in https case when a previous negotiate failed.
+ * In that case, we want to continue using http/1.1. When a connection is to be opened and
+ * if multiple requests are sent in parallel then each will open a new connection.
+ *
+ * If negotiation/upgrade succeeds then
+ * one connection will be put in the cache and the others will be closed
+ * after the initial request completes (not strictly necessary for h2, only for h2c)
*
- * Only one connection per destination is created. Blocks when opening
- * connection, or when waiting for connection to be opened.
- * First thread opens the connection and notifies the others when done.
+ * If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
+ * and will be used and cached in the http/1 cache. Note, this method handles the
+ * https failure case only (by completing the CF with an ALPN exception, handled externally)
+ * The h2c upgrade is handled externally also.
*
- * If the request is secure (https) then we open the connection here.
- * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
- * In latter case, when the Http2Connection is connected, putConnection() must
- * be called to store it.
+ * Specific CF behavior of this method.
+ * 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
+ * 2. completes with other exception: failure not recorded. Caller must handle
+ * 3. completes normally with null: no connection in cache for h2c or h2 failed previously
+ * 4. completes normally with connection: h2 or h2c connection in cache. Use it.
*/
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
- synchronized (opening) {
+ synchronized (this) {
Http2Connection connection = connections.get(key);
if (connection != null) { // fast path if connection already exists
return CompletableFuture.completedFuture(connection);
}
- if (!req.secure()) {
+ if (!req.secure() || failures.contains(key)) {
+ // secure: negotiate failed before. Use http/1.1
+ // !secure: no connection available in cache. Attempt upgrade
return MinimalFuture.completedFuture(null);
}
-
- if (!opening.contains(key)) {
- debug.log(Level.DEBUG, "Opening: %s", key);
- opening.add(key);
- } else {
- CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
- addToWaiting(key, cf);
- return cf;
- }
}
return Http2Connection
.createAsync(req, this)
.whenComplete((conn, t) -> {
- debug.log(Level.DEBUG,
- "waking up dependents with created connection");
- synchronized (opening) {
- Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
- debug.log(Level.DEBUG, "Opening completed: %s", key);
- opening.remove(key);
- if (t == null && conn != null)
- putConnection(conn);
- final Throwable cause = Utils.getCompletionCause(t);
- if (waiters == null) {
- debug.log(Level.DEBUG, "no dependent to wake up");
- return;
- } else if (cause instanceof Http2Connection.ALPNException) {
- waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
- client.theExecutor()));
- } else if (cause != null) {
- debug.log(Level.DEBUG,
- () -> "waking up dependants: failed: " + cause);
- waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
- } else {
- debug.log(Level.DEBUG, "waking up dependants: succeeded");
- waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
- client.theExecutor()));
+ synchronized (Http2ClientImpl.this) {
+ if (conn != null) {
+ offerConnection(conn);
+ } else {
+ Throwable cause = Utils.getCompletionCause(t);
+ if (cause instanceof Http2Connection.ALPNException)
+ failures.add(key);
}
}
});
}
/*
- * TODO: If there isn't a connection to the same destination, then
- * store it. If there is already a connection, then close it
+ * Cache the given connection, if no connection to the same
+ * destination exists. If one exists, then we let the initial stream
+ * complete but allow it to close itself upon completion.
+ * This situation should not arise with https because the request
+ * has not been sent as part of the initial alpn negotiation
*/
- void putConnection(Http2Connection c) {
- connections.put(c.key(), c);
+ boolean offerConnection(Http2Connection c) {
+ String key = c.key();
+ Http2Connection c1 = connections.putIfAbsent(key, c);
+ if (c1 != null) {
+ c.setSingleStream(true);
+ return false;
+ }
+ return true;
}
void deleteConnection(Http2Connection c) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Dec 19 16:12:55 2017 +0000
@@ -116,6 +116,8 @@
Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
+ private boolean singleStream; // used only for stream 1, then closed
+
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
@@ -202,7 +204,6 @@
prefaceSent = true;
}
}
-
}
volatile boolean closed;
@@ -397,6 +398,14 @@
return aconn.getALPN().thenCompose(checkAlpnCF);
}
+ synchronized boolean singleStream() {
+ return singleStream;
+ }
+
+ synchronized void setSingleStream(boolean use) {
+ singleStream = use;
+ }
+
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
@@ -429,6 +438,10 @@
// P indicates proxy
// Eg: "S:H:foo.com:80"
static String keyString(boolean secure, boolean proxy, String host, int port) {
+ if (secure && port == -1)
+ port = 443;
+ else if (!secure && port == -1)
+ port = 80;
return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
}
@@ -436,8 +449,8 @@
return this.key;
}
- void putConnection() {
- client2.putConnection(this);
+ boolean offerConnection() {
+ return client2.offerConnection(this);
}
private HttpPublisher publisher() {
@@ -464,6 +477,7 @@
}
void close() {
+ Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
@@ -680,7 +694,12 @@
// corresponding entry in the window controller.
windowController.removeStream(streamid);
}
+ if (singleStream() && streams.isEmpty()) {
+ // should be only 1 stream, but there might be more if server push
+ close();
+ }
}
+
/**
* Increments this connection's send Window by the amount in the given frame.
*/
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Tue Dec 19 16:12:55 2017 +0000
@@ -84,7 +84,7 @@
boolean finished = chan.finishConnect();
assert finished : "Expected channel to be connected";
debug.log(Level.DEBUG,
- "ConnectEvent: connect finished: %s", finished);
+ "ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress());
connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Tue Dec 19 21:35:30 2017 +0530
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Tue Dec 19 16:12:55 2017 +0000
@@ -401,8 +401,8 @@
pusher.onSubscribe(this.sub = sub);
try {
if (contentLength == 0) {
+ onFinished.run();
pusher.onComplete();
- onFinished.run();
onComplete.accept(null);
}
} catch (Throwable t) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Tue Dec 19 21:35:30 2017 +0530
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Tue Dec 19 16:12:55 2017 +0000
@@ -170,7 +170,7 @@
return new ServerSocket(port);
}
- public void stop() {
+ public synchronized void stop() {
// TODO: clean shutdown GoAway
stopping = true;
System.err.printf("Server stopping %d connections\n", connections.size());
@@ -205,6 +205,15 @@
return serverName;
}
+ private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+ if (!stopping)
+ connections.put(addr, c);
+ }
+
+ private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
+ connections.remove(addr, c);
+ }
+
/**
* Starts a thread which waits for incoming connections.
*/
@@ -216,7 +225,7 @@
InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
Http2TestServerConnection c =
new Http2TestServerConnection(this, socket, exchangeSupplier);
- connections.put(addr, c);
+ putConnection(addr, c);
try {
c.run();
} catch (Throwable e) {
@@ -224,7 +233,7 @@
// the connection might not have been closed
// and if so then the client might wait
// forever.
- connections.remove(addr, c);
+ removeConnection(addr, c);
c.close(ErrorFrame.PROTOCOL_ERROR);
System.err.println("TestServer: start exception: " + e);
//throw e;