src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
changeset 48083 b1c1b4ef4be2
parent 47216 71c04702a3d5
child 48263 a559b7cd1dea
child 55973 4d9b002587db
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Fri Nov 03 10:01:08 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Wed Dec 06 11:11:59 2017 -0800
@@ -28,33 +28,45 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.lang.ref.WeakReference;
 import java.net.Authenticator;
-import java.net.CookieManager;
+import java.net.CookieHandler;
 import java.net.ProxySelector;
-import java.net.URI;
+import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedAction;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.MultiSubscriber;
 import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.Pair;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.websocket.BuilderImpl;
+import jdk.internal.misc.InnocuousThread;
 
 /**
  * Client implementation. Contains all configuration information and also
@@ -63,46 +75,138 @@
  */
 class HttpClientImpl extends HttpClient {
 
+    static final boolean DEBUG = Utils.DEBUG;  // Revisit: temporary dev flag.
+    static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG;  // Revisit: temporary dev flag.
+    static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag.
+    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+    final System.Logger  debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
+    final System.Logger  debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
+    static final AtomicLong CLIENT_IDS = new AtomicLong();
+
     // Define the default factory as a static inner class
     // that embeds all the necessary logic to avoid
     // the risk of using a lambda that might keep a reference on the
     // HttpClient instance from which it was created (helps with
     // heapdump analysis).
     private static final class DefaultThreadFactory implements ThreadFactory {
-        private DefaultThreadFactory() {}
+        private final String namePrefix;
+        private final AtomicInteger nextId = new AtomicInteger();
+
+        DefaultThreadFactory(long clientID) {
+            namePrefix = "HttpClient-" + clientID + "-Worker-";
+        }
+
         @Override
         public Thread newThread(Runnable r) {
-            Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
+            String name = namePrefix + nextId.getAndIncrement();
+            Thread t;
+            if (System.getSecurityManager() == null) {
+                t = new Thread(null, r, name, 0, false);
+            } else {
+                t = InnocuousThread.newThread(name, r);
+            }
             t.setDaemon(true);
             return t;
         }
-        static final ThreadFactory INSTANCE = new DefaultThreadFactory();
     }
 
-    private final CookieManager cookieManager;
+    private final CookieHandler cookieHandler;
     private final Redirect followRedirects;
+    private final Optional<ProxySelector> userProxySelector;
     private final ProxySelector proxySelector;
     private final Authenticator authenticator;
     private final Version version;
     private final ConnectionPool connections;
     private final Executor executor;
+    private final boolean isDefaultExecutor;
     // Security parameters
     private final SSLContext sslContext;
     private final SSLParameters sslParams;
     private final SelectorManager selmgr;
     private final FilterFactory filters;
     private final Http2ClientImpl client2;
+    private final long id;
+    private final String dbgTag;
+
+    // This reference is used to keep track of the facade HttpClient
+    // that was returned to the application code.
+    // It makes it possible to know when the application no longer
+    // holds any reference to the HttpClient.
+    // Unfortunately, this information is not enough to know when
+    // to exit the SelectorManager thread. Because of the asynchronous
+    // nature of the API, we also need to wait until all pending operations
+    // have completed.
+    private final WeakReference<HttpClientFacade> facadeRef;
+
+    // This counter keeps track of the number of operations pending
+    // on the HttpClient. The SelectorManager thread will wait
+    // until there are no longer any pending operations and the
+    // facadeRef is cleared before exiting.
+    //
+    // The pendingOperationCount is incremented every time a send/sendAsync
+    // operation is invoked on the HttpClient, and is decremented when
+    // the HttpResponse<T> object is returned to the user.
+    // However, at this point, the body may not have been fully read yet.
+    // This is the case when the response T is implemented as a streaming
+    // subscriber (such as an InputStream).
+    //
+    // To take care of this issue the pendingOperationCount will additionally
+    // be incremented/decremented in the following cases:
+    //
+    // 1. For HTTP/2  it is incremented when a stream is added to the
+    //    Http2Connection streams map, and decreased when the stream is removed
+    //    from the map. This should also take care of push promises.
+    // 2. For WebSocket the count is increased when creating a
+    //    DetachedConnectionChannel for the socket, and decreased
+    //    when the the channel is closed.
+    //    In addition, the HttpClient facade is passed to the WebSocket builder,
+    //    (instead of the client implementation delegate).
+    // 3. For HTTP/1.1 the count is incremented before starting to parse the body
+    //    response, and decremented when the parser has reached the end of the
+    //    response body flow.
+    //
+    // This should ensure that the selector manager thread remains alive until
+    // the response has been fully received or the web socket is closed.
+    private final AtomicLong pendingOperationCount = new AtomicLong();
+    private final AtomicLong pendingWebSocketCount = new AtomicLong();
+    private final AtomicLong pendingHttpRequestCount = new AtomicLong();
 
     /** A Set of, deadline first, ordered timeout events. */
     private final TreeSet<TimeoutEvent> timeouts;
 
-    public static HttpClientImpl create(HttpClientBuilderImpl builder) {
-        HttpClientImpl impl = new HttpClientImpl(builder);
-        impl.start();
-        return impl;
+    /**
+     * This is a bit tricky:
+     * 1. an HttpClientFacade has a final HttpClientImpl field.
+     * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
+     *    where the referent is the facade created for that instance.
+     * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
+     *    constructor, because it would be only weakly referenced and could
+     *    be GC'ed before we can return it.
+     * The solution is to use an instance of SingleFacadeFactory which will
+     * allow the caller of new HttpClientImpl(...) to retrieve the facade
+     * after the HttpClientImpl has been created.
+     */
+    private static final class SingleFacadeFactory {
+        HttpClientFacade facade;
+        HttpClientFacade createFacade(HttpClientImpl impl) {
+            assert facade == null;
+            return (facade = new HttpClientFacade(impl));
+        }
     }
 
-    private HttpClientImpl(HttpClientBuilderImpl builder) {
+    static HttpClientFacade create(HttpClientBuilderImpl builder) {
+        SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
+        HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
+        impl.start();
+        assert facadeFactory.facade != null;
+        assert impl.facadeRef.get() == facadeFactory.facade;
+        return facadeFactory.facade;
+    }
+
+    private HttpClientImpl(HttpClientBuilderImpl builder,
+                           SingleFacadeFactory facadeFactory) {
+        id = CLIENT_IDS.incrementAndGet();
+        dbgTag = "HttpClientImpl(" + id +")";
         if (builder.sslContext == null) {
             try {
                 sslContext = SSLContext.getDefault();
@@ -114,16 +218,23 @@
         }
         Executor ex = builder.executor;
         if (ex == null) {
-            ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
+            ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
+            isDefaultExecutor = true;
         } else {
             ex = builder.executor;
+            isDefaultExecutor = false;
         }
+        facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
         client2 = new Http2ClientImpl(this);
         executor = ex;
-        cookieManager = builder.cookieManager;
+        cookieHandler = builder.cookieHandler;
         followRedirects = builder.followRedirects == null ?
                 Redirect.NEVER : builder.followRedirects;
-        this.proxySelector = builder.proxy;
+        this.userProxySelector = Optional.ofNullable(builder.proxy);
+        this.proxySelector = userProxySelector
+                .orElseGet(HttpClientImpl::getDefaultProxySelector);
+        debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)",
+                this.proxySelector, userProxySelector.isPresent());
         authenticator = builder.authenticator;
         if (builder.version == null) {
             version = HttpClient.Version.HTTP_2;
@@ -135,7 +246,7 @@
         } else {
             sslParams = builder.sslParams;
         }
-        connections = new ConnectionPool();
+        connections = new ConnectionPool(id);
         connections.start();
         timeouts = new TreeSet<>();
         try {
@@ -147,30 +258,102 @@
         selmgr.setDaemon(true);
         filters = new FilterFactory();
         initFilters();
+        assert facadeRef.get() != null;
     }
 
     private void start() {
         selmgr.start();
     }
 
+    // Called from the SelectorManager thread, just before exiting.
+    // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
+    // that may be still lingering there are properly closed (and their
+    // possibly still opened SocketChannel released).
+    private void stop() {
+        // Clears HTTP/1.1 cache and close its connections
+        connections.stop();
+        // Clears HTTP/2 cache and close its connections.
+        client2.stop();
+    }
+
     private static SSLParameters getDefaultParams(SSLContext ctx) {
         SSLParameters params = ctx.getSupportedSSLParameters();
         params.setProtocols(new String[]{"TLSv1.2"});
         return params;
     }
 
+    private static ProxySelector getDefaultProxySelector() {
+        PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
+        return AccessController.doPrivileged(action);
+    }
+
+    // Returns the facade that was returned to the application code.
+    // May be null if that facade is no longer referenced.
+    final HttpClientFacade facade() {
+        return facadeRef.get();
+    }
+
+    // Increments the pendingOperationCount.
+    final long reference() {
+        pendingHttpRequestCount.incrementAndGet();
+        return pendingOperationCount.incrementAndGet();
+    }
+
+    // Decrements the pendingOperationCount.
+    final long unreference() {
+        final long count = pendingOperationCount.decrementAndGet();
+        final long httpCount = pendingHttpRequestCount.decrementAndGet();
+        final long webSocketCount = pendingWebSocketCount.get();
+        if (count == 0 && facade() == null) {
+            selmgr.wakeupSelector();
+        }
+        assert httpCount >= 0 : "count of HTTP operations < 0";
+        assert webSocketCount >= 0 : "count of WS operations < 0";
+        assert count >= 0 : "count of pending operations < 0";
+        return count;
+    }
+
+    // Increments the pendingOperationCount.
+    final long webSocketOpen() {
+        pendingWebSocketCount.incrementAndGet();
+        return pendingOperationCount.incrementAndGet();
+    }
+
+    // Decrements the pendingOperationCount.
+    final long webSocketClose() {
+        final long count = pendingOperationCount.decrementAndGet();
+        final long webSocketCount = pendingWebSocketCount.decrementAndGet();
+        final long httpCount = pendingHttpRequestCount.get();
+        if (count == 0 && facade() == null) {
+            selmgr.wakeupSelector();
+        }
+        assert httpCount >= 0 : "count of HTTP operations < 0";
+        assert webSocketCount >= 0 : "count of WS operations < 0";
+        assert count >= 0 : "count of pending operations < 0";
+        return count;
+    }
+
+    // Returns the pendingOperationCount.
+    final long referenceCount() {
+        return pendingOperationCount.get();
+    }
+
+    // Called by the SelectorManager thread to figure out whether it's time
+    // to terminate.
+    final boolean isReferenced() {
+        HttpClient facade = facade();
+        return facade != null || referenceCount() > 0;
+    }
+
     /**
-     * Wait for activity on given exchange (assuming blocking = false).
-     * It's a no-op if blocking = true. In particular, the following occurs
-     * in the SelectorManager thread.
+     * Wait for activity on given exchange.
+     * The following occurs in the SelectorManager thread.
      *
-     *  1) mark the connection non-blocking
-     *  2) add to selector
-     *  3) If selector fires for this exchange then
-     *  4)   - mark connection as blocking
-     *  5)   - call AsyncEvent.handle()
+     *  1) add to selector
+     *  2) If selector fires for this exchange then
+     *     call AsyncEvent.handle()
      *
-     * If exchange needs to block again, then call registerEvent() again
+     * If exchange needs to change interest ops, then call registerEvent() again.
      */
     void registerEvent(AsyncEvent exchange) throws IOException {
         selmgr.register(exchange);
@@ -184,131 +367,196 @@
         selmgr.cancel(s);
     }
 
+    /**
+     * Allows an AsyncEvent to modify its interestOps.
+     * @param event The modified event.
+     */
+    void eventUpdated(AsyncEvent event) throws ClosedChannelException {
+        assert !(event instanceof AsyncTriggerEvent);
+        selmgr.eventUpdated(event);
+    }
+
+    boolean isSelectorThread() {
+        return Thread.currentThread() == selmgr;
+    }
 
     Http2ClientImpl client2() {
         return client2;
     }
 
-    /*
-    @Override
-    public ByteBuffer getBuffer() {
-        return pool.getBuffer();
-    }
-
-    // SSL buffers are larger. Manage separately
-
-    int size = 16 * 1024;
-
-    ByteBuffer getSSLBuffer() {
-        return ByteBuffer.allocate(size);
-    }
-
-    /**
-     * Return a new buffer that's a bit bigger than the given one
-     *
-     * @param buf
-     * @return
-     *
-    ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
-        size = buf.capacity() * 12 / 10; // 20% bigger
-        return ByteBuffer.allocate(size);
-    }
-
-    synchronized void returnSSLBuffer(ByteBuffer buf) {
-        if (buf.capacity() >= size)
-           sslBuffers.add(0, buf);
+    private void debugCompleted(String tag, long startNanos, HttpRequest req) {
+        if (debugelapsed.isLoggable(Level.DEBUG)) {
+            debugelapsed.log(Level.DEBUG, () -> tag + " elapsed "
+                    + (System.nanoTime() - startNanos)/1000_000L
+                    + " millis for " + req.method()
+                    + " to " + req.uri());
+        }
     }
 
     @Override
-    public void returnBuffer(ByteBuffer buffer) {
-        pool.returnBuffer(buffer);
-    }
-    */
-
-    @Override
     public <T> HttpResponse<T>
-    send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
+    send(HttpRequest req, BodyHandler<T> responseHandler)
         throws IOException, InterruptedException
     {
-        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
-        return mex.response();
+        try {
+            return sendAsync(req, responseHandler).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof Error)
+                throw (Error)t;
+            if (t instanceof RuntimeException)
+                throw (RuntimeException)t;
+            else if (t instanceof IOException)
+                throw Utils.getIOException(t);
+            else
+                throw new InternalError("Unexpected exception", t);
+        }
     }
 
     @Override
     public <T> CompletableFuture<HttpResponse<T>>
-    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
+    sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
     {
-        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
-        return mex.responseAsync()
-                  .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
+        AccessControlContext acc = null;
+        if (System.getSecurityManager() != null)
+            acc = AccessController.getContext();
+
+        // Clone the, possibly untrusted, HttpRequest
+        HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
+        if (requestImpl.method().equals("CONNECT"))
+            throw new IllegalArgumentException("Unsupported method CONNECT");
+
+        long start = DEBUGELAPSED ? System.nanoTime() : 0;
+        reference();
+        try {
+            debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
+
+            MultiExchange<Void,T> mex = new MultiExchange<>(userRequest,
+                                                            requestImpl,
+                                                            this,
+                                                            responseHandler,
+                                                            acc);
+            CompletableFuture<HttpResponse<T>> res =
+                    mex.responseAsync().whenComplete((b,t) -> unreference());
+            if (DEBUGELAPSED) {
+                res = res.whenComplete(
+                        (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
+            }
+            // makes sure that any dependent actions happen in the executor
+            if (acc != null) {
+                res.whenCompleteAsync((r, t) -> { /* do nothing */},
+                                      new PrivilegedExecutor(executor, acc));
+            }
+
+            return res;
+        } catch(Throwable t) {
+            unreference();
+            debugCompleted("ClientImpl (async)", start, userRequest);
+            throw t;
+        }
     }
 
     @Override
     public <U, T> CompletableFuture<U>
-    sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
-        MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
-        return mex.multiResponseAsync();
-    }
+    sendAsync(HttpRequest userRequest, MultiSubscriber<U, T> responseHandler) {
+        AccessControlContext acc = null;
+        if (System.getSecurityManager() != null)
+            acc = AccessController.getContext();
 
-    // new impl. Should get rid of above
-    /*
-    static class BufferPool implements BufferHandler {
+        // Clone the, possibly untrusted, HttpRequest
+        HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
+        if (requestImpl.method().equals("CONNECT"))
+            throw new IllegalArgumentException("Unsupported method CONNECT");
 
-        final LinkedList<ByteBuffer> freelist = new LinkedList<>();
+        long start = DEBUGELAPSED ? System.nanoTime() : 0;
+        reference();
+        try {
+            debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest);
 
-        @Override
-        public synchronized ByteBuffer getBuffer() {
-            ByteBuffer buf;
+            MultiExchange<U,T> mex = new MultiExchange<>(userRequest,
+                                                         requestImpl,
+                                                         this,
+                                                         responseHandler,
+                                                         acc);
+            CompletableFuture<U> res = mex.multiResponseAsync()
+                      .whenComplete((b,t) -> unreference());
+            if (DEBUGELAPSED) {
+                res = res.whenComplete(
+                        (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
+            }
+            // makes sure that any dependent actions happen in the executor
+            if (acc != null) {
+                res.whenCompleteAsync((r, t) -> { /* do nothing */},
+                                      new PrivilegedExecutor(executor, acc));
+            }
 
-            while (!freelist.isEmpty()) {
-                buf = freelist.removeFirst();
-                buf.clear();
-                return buf;
-            }
-            return ByteBuffer.allocate(BUFSIZE);
-        }
-
-        @Override
-        public synchronized void returnBuffer(ByteBuffer buffer) {
-            assert buffer.capacity() > 0;
-            freelist.add(buffer);
+            return res;
+        } catch(Throwable t) {
+            unreference();
+            debugCompleted("ClientImpl (async)", start, userRequest);
+            throw t;
         }
     }
 
-    static BufferPool pool = new BufferPool();
-
-    static BufferHandler pool() {
-        return pool;
-    }
-*/
     // Main loop for this client's selector
     private final static class SelectorManager extends Thread {
 
-        private static final long NODEADLINE = 3000L;
+        // For testing purposes we have an internal System property that
+        // can control the frequency at which the selector manager will wake
+        // up when there are no pending operations.
+        // Increasing the frequency (shorter delays) might allow the selector
+        // to observe that the facade is no longer referenced and might allow
+        // the selector thread to terminate more timely - for when nothing is
+        // ongoing it will only check for that condition every NODEADLINE ms.
+        // To avoid misuse of the property, the delay that can be specified
+        // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
+        // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
+        // The property is -Djdk.httpclient.internal.selector.timeout=<millis>
+        private static final int MIN_NODEADLINE = 1000; // ms
+        private static final int MAX_NODEADLINE = 1000 * 1200; // ms
+        private static final int DEF_NODEADLINE = 3000; // ms
+        private static final long NODEADLINE; // default is DEF_NODEADLINE ms
+        static {
+            // ensure NODEADLINE is initialized with some valid value.
+            long deadline =  Utils.getIntegerNetProperty(
+                "jdk.httpclient.internal.selector.timeout",
+                DEF_NODEADLINE); // millis
+            if (deadline <= 0) deadline = DEF_NODEADLINE;
+            deadline = Math.max(deadline, MIN_NODEADLINE);
+            NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
+        }
+
         private final Selector selector;
         private volatile boolean closed;
-        private final List<AsyncEvent> readyList;
         private final List<AsyncEvent> registrations;
-
-        // Uses a weak reference to the HttpClient owning this
-        // selector: a strong reference prevents its garbage
-        // collection while the thread is running.
-        // We want the thread to exit gracefully when the
-        // HttpClient that owns it gets GC'ed.
-        WeakReference<HttpClientImpl> ownerRef;
+        private final System.Logger debug;
+        private final System.Logger debugtimeout;
+        HttpClientImpl owner;
+        ConnectionPool pool;
 
         SelectorManager(HttpClientImpl ref) throws IOException {
-            super(null, null, "SelectorManager", 0, false);
-            ownerRef = new WeakReference<>(ref);
-            readyList = new ArrayList<>();
+            super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false);
+            owner = ref;
+            debug = ref.debug;
+            debugtimeout = ref.debugtimeout;
+            pool = ref.connectionPool();
             registrations = new ArrayList<>();
             selector = Selector.open();
         }
 
+        void eventUpdated(AsyncEvent e) throws ClosedChannelException {
+            if (Thread.currentThread() == this) {
+                SelectionKey key = e.channel().keyFor(selector);
+                SelectorAttachment sa = (SelectorAttachment) key.attachment();
+                if (sa != null) sa.register(e);
+            } else {
+                register(e);
+            }
+        }
+
         // This returns immediately. So caller not allowed to send/receive
         // on connection.
-
-        synchronized void register(AsyncEvent e) throws IOException {
+        synchronized void register(AsyncEvent e) {
             registrations.add(e);
             selector.wakeup();
         }
@@ -326,23 +574,34 @@
         }
 
         synchronized void shutdown() {
+            debug.log(Level.DEBUG, "SelectorManager shutting down");
             closed = true;
             try {
                 selector.close();
-            } catch (IOException ignored) { }
+            } catch (IOException ignored) {
+            } finally {
+                owner.stop();
+            }
         }
 
         @Override
         public void run() {
+            List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
+            List<AsyncEvent> readyList = new ArrayList<>();
             try {
                 while (!Thread.currentThread().isInterrupted()) {
-                    HttpClientImpl client;
                     synchronized (this) {
-                        for (AsyncEvent exchange : registrations) {
-                            SelectableChannel c = exchange.channel();
+                        assert errorList.isEmpty();
+                        assert readyList.isEmpty();
+                        for (AsyncEvent event : registrations) {
+                            if (event instanceof AsyncTriggerEvent) {
+                                readyList.add(event);
+                                continue;
+                            }
+                            SelectableChannel chan = event.channel();
+                            SelectionKey key = null;
                             try {
-                                c.configureBlocking(false);
-                                SelectionKey key = c.keyFor(selector);
+                                key = chan.keyFor(selector);
                                 SelectorAttachment sa;
                                 if (key == null || !key.isValid()) {
                                     if (key != null) {
@@ -351,91 +610,163 @@
                                         // before registering the new event.
                                         selector.selectNow();
                                     }
-                                    sa = new SelectorAttachment(c, selector);
+                                    sa = new SelectorAttachment(chan, selector);
                                 } else {
                                     sa = (SelectorAttachment) key.attachment();
                                 }
-                                sa.register(exchange);
+                                // may throw IOE if channel closed: that's OK
+                                sa.register(event);
+                                if (!chan.isOpen()) {
+                                    throw new IOException("Channel closed");
+                                }
                             } catch (IOException e) {
-                                Log.logError("HttpClientImpl: " + e);
-                                c.close();
-                                // let the exchange deal with it
-                                handleEvent(exchange);
+                                Log.logTrace("HttpClientImpl: " + e);
+                                debug.log(Level.DEBUG, () ->
+                                        "Got " + e.getClass().getName()
+                                                 + " while handling"
+                                                 + " registration events");
+                                chan.close();
+                                // let the event abort deal with it
+                                errorList.add(new Pair<>(event, e));
+                                if (key != null) {
+                                    key.cancel();
+                                    selector.selectNow();
+                                }
                             }
                         }
                         registrations.clear();
+                        selector.selectedKeys().clear();
                     }
 
+                    for (AsyncEvent event : readyList) {
+                        assert event instanceof AsyncTriggerEvent;
+                        event.handle();
+                    }
+                    readyList.clear();
+
+                    for (Pair<AsyncEvent,IOException> error : errorList) {
+                        // an IOException was raised and the channel closed.
+                        handleEvent(error.first, error.second);
+                    }
+                    errorList.clear();
+
                     // Check whether client is still alive, and if not,
                     // gracefully stop this thread
-                    if ((client = ownerRef.get()) == null) {
+                    if (!owner.isReferenced()) {
                         Log.logTrace("HttpClient no longer referenced. Exiting...");
                         return;
                     }
-                    long millis = client.purgeTimeoutsAndReturnNextDeadline();
-                    client = null; // don't hold onto the client ref
+
+                    // Timeouts will have milliseconds granularity. It is important
+                    // to handle them in a timely fashion.
+                    long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
+                    debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout);
 
-                    //debugPrint(selector);
+                    // Keep-alive have seconds granularity. It's not really an
+                    // issue if we keep connections linger a bit more in the keep
+                    // alive cache.
+                    long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
+                    debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry);
+
+                    assert nextTimeout >= 0;
+                    assert nextExpiry >= 0;
+
                     // Don't wait for ever as it might prevent the thread to
                     // stop gracefully. millis will be 0 if no deadline was found.
+                    if (nextTimeout <= 0) nextTimeout = NODEADLINE;
+
+                    // Clip nextExpiry at NODEADLINE limit. The default
+                    // keep alive is 1200 seconds (half an hour) - we don't
+                    // want to wait that long.
+                    if (nextExpiry <= 0) nextExpiry = NODEADLINE;
+                    else nextExpiry = Math.min(NODEADLINE, nextExpiry);
+
+                    // takes the least of the two.
+                    long millis = Math.min(nextExpiry, nextTimeout);
+
+                    debugtimeout.log(Level.DEBUG, "Next deadline is %d",
+                                     (millis == 0 ? NODEADLINE : millis));
+                    //debugPrint(selector);
                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
                     if (n == 0) {
                         // Check whether client is still alive, and if not,
                         // gracefully stop this thread
-                        if ((client = ownerRef.get()) == null) {
+                        if (!owner.isReferenced()) {
                             Log.logTrace("HttpClient no longer referenced. Exiting...");
                             return;
                         }
-                        client.purgeTimeoutsAndReturnNextDeadline();
-                        client = null; // don't hold onto the client ref
+                        owner.purgeTimeoutsAndReturnNextDeadline();
                         continue;
                     }
                     Set<SelectionKey> keys = selector.selectedKeys();
 
+                    assert errorList.isEmpty();
                     for (SelectionKey key : keys) {
                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
-                        int eventsOccurred = key.readyOps();
+                        if (!key.isValid()) {
+                            IOException ex = sa.chan.isOpen()
+                                    ? new IOException("Invalid key")
+                                    : new ClosedChannelException();
+                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
+                            sa.pending.clear();
+                            continue;
+                        }
+
+                        int eventsOccurred;
+                        try {
+                            eventsOccurred = key.readyOps();
+                        } catch (CancelledKeyException ex) {
+                            IOException io = Utils.getIOException(ex);
+                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
+                            sa.pending.clear();
+                            continue;
+                        }
                         sa.events(eventsOccurred).forEach(readyList::add);
                         sa.resetInterestOps(eventsOccurred);
                     }
                     selector.selectNow(); // complete cancellation
                     selector.selectedKeys().clear();
 
-                    for (AsyncEvent exchange : readyList) {
-                        if (exchange.blocking()) {
-                            exchange.channel().configureBlocking(true);
-                        }
-                        handleEvent(exchange); // will be delegated to executor
+                    for (AsyncEvent event : readyList) {
+                        handleEvent(event, null); // will be delegated to executor
                     }
                     readyList.clear();
+                    errorList.forEach((p) -> handleEvent(p.first, p.second));
+                    errorList.clear();
                 }
             } catch (Throwable e) {
+                //e.printStackTrace();
                 if (!closed) {
                     // This terminates thread. So, better just print stack trace
                     String err = Utils.stackTrace(e);
                     Log.logError("HttpClientImpl: fatal error: " + err);
                 }
+                debug.log(Level.DEBUG, "shutting down", e);
+                if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) {
+                    e.printStackTrace(System.err); // always print the stack
+                }
             } finally {
                 shutdown();
             }
         }
 
-        void debugPrint(Selector selector) {
-            System.err.println("Selector: debugprint start");
-            Set<SelectionKey> keys = selector.keys();
-            for (SelectionKey key : keys) {
-                SelectableChannel c = key.channel();
-                int ops = key.interestOps();
-                System.err.printf("selector chan:%s ops:%d\n", c, ops);
-            }
-            System.err.println("Selector: debugprint end");
-        }
+//        void debugPrint(Selector selector) {
+//            System.err.println("Selector: debugprint start");
+//            Set<SelectionKey> keys = selector.keys();
+//            for (SelectionKey key : keys) {
+//                SelectableChannel c = key.channel();
+//                int ops = key.interestOps();
+//                System.err.printf("selector chan:%s ops:%d\n", c, ops);
+//            }
+//            System.err.println("Selector: debugprint end");
+//        }
 
-        void handleEvent(AsyncEvent e) {
-            if (closed) {
-                e.abort();
+        /** Handles the given event. The given ioe may be null. */
+        void handleEvent(AsyncEvent event, IOException ioe) {
+            if (closed || ioe != null) {
+                event.abort(ioe);
             } else {
-                e.handle();
+                event.handle();
             }
         }
     }
@@ -453,11 +784,13 @@
     private static class SelectorAttachment {
         private final SelectableChannel chan;
         private final Selector selector;
-        private final ArrayList<AsyncEvent> pending;
+        private final Set<AsyncEvent> pending;
+        private final static System.Logger debug =
+                Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG);
         private int interestOps;
 
         SelectorAttachment(SelectableChannel chan, Selector selector) {
-            this.pending = new ArrayList<>();
+            this.pending = new HashSet<>();
             this.chan = chan;
             this.selector = selector;
         }
@@ -506,23 +839,43 @@
 
             this.interestOps = newOps;
             SelectionKey key = chan.keyFor(selector);
-            if (newOps == 0) {
+            if (newOps == 0 && pending.isEmpty()) {
                 key.cancel();
             } else {
-                key.interestOps(newOps);
+                try {
+                    key.interestOps(newOps);
+                } catch (CancelledKeyException x) {
+                    // channel may have been closed
+                    debug.log(Level.DEBUG, "key cancelled for " + chan);
+                    abortPending(x);
+                }
+            }
+        }
+
+        void abortPending(Throwable x) {
+            if (!pending.isEmpty()) {
+                AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
+                pending.clear();
+                IOException io = Utils.getIOException(x);
+                for (AsyncEvent event : evts) {
+                    event.abort(io);
+                }
             }
         }
     }
 
-    @Override
-    public SSLContext sslContext() {
-        Utils.checkNetPermission("getSSLContext");
+    /*package-private*/ SSLContext theSSLContext() {
         return sslContext;
     }
 
     @Override
-    public Optional<SSLParameters> sslParameters() {
-        return Optional.ofNullable(sslParams);
+    public SSLContext sslContext() {
+        return sslContext;
+    }
+
+    @Override
+    public SSLParameters sslParameters() {
+        return Utils.copySSLParameters(sslParams);
     }
 
     @Override
@@ -530,9 +883,13 @@
         return Optional.ofNullable(authenticator);
     }
 
+    /*package-private*/ final Executor theExecutor() {
+        return executor;
+    }
+
     @Override
-    public Executor executor() {
-        return executor;
+    public final Optional<Executor> executor() {
+        return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
     }
 
     ConnectionPool connectionPool() {
@@ -546,19 +903,28 @@
 
 
     @Override
-    public Optional<CookieManager> cookieManager() {
-        return Optional.ofNullable(cookieManager);
+    public Optional<CookieHandler> cookieHandler() {
+        return Optional.ofNullable(cookieHandler);
     }
 
     @Override
     public Optional<ProxySelector> proxy() {
-        return Optional.ofNullable(this.proxySelector);
+        return this.userProxySelector;
+    }
+
+    // Return the effective proxy that this client uses.
+    ProxySelector proxySelector() {
+        return proxySelector;
     }
 
     @Override
-    public WebSocket.Builder newWebSocketBuilder(URI uri,
-                                                 WebSocket.Listener listener) {
-        return new BuilderImpl(this, uri, listener);
+    public WebSocket.Builder newWebSocketBuilder() {
+        // Make sure to pass the HttpClientFacade to the WebSocket builder.
+        // This will ensure that the facade is not released before the
+        // WebSocket has been created, at which point the pendingOperationCount
+        // will have been incremented by the DetachedConnectionChannel
+        // (see PlainHttpConnection.detachChannel())
+        return new BuilderImpl(this.facade(), proxySelector);
     }
 
     @Override
@@ -566,16 +932,21 @@
         return version;
     }
 
-    //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
+    String dbgString() {
+        return dbgTag;
+    }
 
-    boolean getHttp2Allowed() {
-        return version.equals(Version.HTTP_2);
+    @Override
+    public String toString() {
+        // Used by tests to get the client's id and compute the
+        // name of the SelectorManager thread.
+        return super.toString() + ("(" + id + ")");
     }
 
     private void initFilters() {
         addFilter(AuthenticationFilter.class);
         addFilter(RedirectFilter.class);
-        if (this.cookieManager != null) {
+        if (this.cookieHandler != null) {
             addFilter(CookieFilter.class);
         }
     }