--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Aug 08 15:51:08 2018 -0700
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Aug 09 11:23:12 2018 +0100
@@ -26,6 +26,7 @@
package jdk.internal.net.http;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
@@ -34,6 +35,7 @@
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
@@ -53,9 +55,52 @@
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
+ private volatile ConnectTimerEvent connectTimerEvent; // may be null
// should be volatile to provide proper synchronization(visibility) action
+ /**
+ * Returns a ConnectTimerEvent iff there is a connect timeout duration,
+ * otherwise null.
+ */
+ private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
+ CompletableFuture<Void> cf) {
+ Duration duration = client().connectTimeout().orElse(null);
+ if (duration != null) {
+ ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
+ return cte;
+ }
+ return null;
+ }
+
+ final class ConnectTimerEvent extends TimeoutEvent {
+ private final CompletableFuture<Void> cf;
+ private final Exchange<?> exchange;
+
+ ConnectTimerEvent(Duration duration,
+ Exchange<?> exchange,
+ CompletableFuture<Void> cf) {
+ super(duration);
+ this.exchange = exchange;
+ this.cf = cf;
+ }
+
+ @Override
+ public void handle() {
+ if (debug.on()) {
+ debug.log("HTTP connect timed out");
+ }
+ ConnectException ce = new ConnectException("HTTP connect timed out");
+ exchange.multi.cancel(ce);
+ client().theExecutor().execute(() -> cf.completeExceptionally(ce));
+ }
+
+ @Override
+ public String toString() {
+ return "ConnectTimerEvent, " + super.toString();
+ }
+ }
+
final class ConnectEvent extends AsyncEvent {
private final CompletableFuture<Void> cf;
@@ -85,7 +130,6 @@
if (debug.on())
debug.log("ConnectEvent: connect finished: %s Local addr: %s",
finished, chan.getLocalAddress());
- connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
} catch (Throwable e) {
@@ -103,12 +147,20 @@
}
@Override
- public CompletableFuture<Void> connectAsync() {
+ public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
assert !connected : "Already connected";
assert !chan.isBlocking() : "Unexpected blocking channel";
- boolean finished = false;
+ boolean finished;
+
+ connectTimerEvent = newConnectTimer(exchange, cf);
+ if (connectTimerEvent != null) {
+ if (debug.on())
+ debug.log("registering connect timer: " + connectTimerEvent);
+ client().registerTimer(connectTimerEvent);
+ }
+
PrivilegedExceptionAction<Boolean> pa =
() -> chan.connect(Utils.resolveAddress(address));
try {
@@ -118,7 +170,6 @@
}
if (finished) {
if (debug.on()) debug.log("connect finished without blocking");
- connected = true;
cf.complete(null);
} else {
if (debug.on()) debug.log("registering connect event");
@@ -137,6 +188,16 @@
}
@Override
+ public CompletableFuture<Void> finishConnect() {
+ assert connected == false;
+ if (debug.on()) debug.log("finishConnect, setting connected=true");
+ connected = true;
+ if (connectTimerEvent != null)
+ client().cancelTimer(connectTimerEvent);
+ return MinimalFuture.completedFuture(null);
+ }
+
+ @Override
SocketChannel channel() {
return chan;
}
@@ -210,6 +271,8 @@
Log.logTrace("Closing: " + toString());
if (debug.on())
debug.log("Closing channel: " + client().debugInterestOps(chan));
+ if (connectTimerEvent != null)
+ client().cancelTimer(connectTimerEvent);
chan.close();
tube.signalClosed();
} catch (IOException e) {