src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
changeset 51364 31d9e82b2e64
parent 50681 4254bed3c09d
child 53387 c9622e15ba29
child 56868 67c7659ecda5
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Wed Aug 08 15:51:08 2018 -0700
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Thu Aug 09 11:23:12 2018 +0100
@@ -26,6 +26,7 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.channels.SelectableChannel;
@@ -34,6 +35,7 @@
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Log;
@@ -53,9 +55,52 @@
     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
     private volatile boolean connected;
     private boolean closed;
+    private volatile ConnectTimerEvent connectTimerEvent;  // may be null
 
     // should be volatile to provide proper synchronization(visibility) action
 
+    /**
+     * Returns a ConnectTimerEvent iff there is a connect timeout duration,
+     * otherwise null.
+     */
+    private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
+                                              CompletableFuture<Void> cf) {
+        Duration duration = client().connectTimeout().orElse(null);
+        if (duration != null) {
+            ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
+            return cte;
+        }
+        return null;
+    }
+
+    final class ConnectTimerEvent extends TimeoutEvent {
+        private final CompletableFuture<Void> cf;
+        private final Exchange<?> exchange;
+
+        ConnectTimerEvent(Duration duration,
+                          Exchange<?> exchange,
+                          CompletableFuture<Void> cf) {
+            super(duration);
+            this.exchange = exchange;
+            this.cf = cf;
+        }
+
+        @Override
+        public void handle() {
+            if (debug.on()) {
+                debug.log("HTTP connect timed out");
+            }
+            ConnectException ce = new ConnectException("HTTP connect timed out");
+            exchange.multi.cancel(ce);
+            client().theExecutor().execute(() -> cf.completeExceptionally(ce));
+        }
+
+        @Override
+        public String toString() {
+            return "ConnectTimerEvent, " + super.toString();
+        }
+    }
+
     final class ConnectEvent extends AsyncEvent {
         private final CompletableFuture<Void> cf;
 
@@ -85,7 +130,6 @@
                 if (debug.on())
                     debug.log("ConnectEvent: connect finished: %s Local addr: %s",
                               finished, chan.getLocalAddress());
-                connected = true;
                 // complete async since the event runs on the SelectorManager thread
                 cf.completeAsync(() -> null, client().theExecutor());
             } catch (Throwable e) {
@@ -103,12 +147,20 @@
     }
 
     @Override
-    public CompletableFuture<Void> connectAsync() {
+    public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
         CompletableFuture<Void> cf = new MinimalFuture<>();
         try {
             assert !connected : "Already connected";
             assert !chan.isBlocking() : "Unexpected blocking channel";
-            boolean finished = false;
+            boolean finished;
+
+            connectTimerEvent = newConnectTimer(exchange, cf);
+            if (connectTimerEvent != null) {
+                if (debug.on())
+                    debug.log("registering connect timer: " + connectTimerEvent);
+                client().registerTimer(connectTimerEvent);
+            }
+
             PrivilegedExceptionAction<Boolean> pa =
                     () -> chan.connect(Utils.resolveAddress(address));
             try {
@@ -118,7 +170,6 @@
             }
             if (finished) {
                 if (debug.on()) debug.log("connect finished without blocking");
-                connected = true;
                 cf.complete(null);
             } else {
                 if (debug.on()) debug.log("registering connect event");
@@ -137,6 +188,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> finishConnect() {
+        assert connected == false;
+        if (debug.on()) debug.log("finishConnect, setting connected=true");
+        connected = true;
+        if (connectTimerEvent != null)
+            client().cancelTimer(connectTimerEvent);
+        return MinimalFuture.completedFuture(null);
+    }
+
+    @Override
     SocketChannel channel() {
         return chan;
     }
@@ -210,6 +271,8 @@
             Log.logTrace("Closing: " + toString());
             if (debug.on())
                 debug.log("Closing channel: " + client().debugInterestOps(chan));
+            if (connectTimerEvent != null)
+                client().cancelTimer(connectTimerEvent);
             chan.close();
             tube.signalClosed();
         } catch (IOException e) {