src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/HttpClientImpl.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56074 06459c34105f
child 56081 20c6742e5545
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import javax.net.ssl.SSLContext;
       
    29 import javax.net.ssl.SSLParameters;
       
    30 import java.io.IOException;
       
    31 import java.lang.System.Logger.Level;
       
    32 import java.lang.ref.WeakReference;
       
    33 import java.net.Authenticator;
       
    34 import java.net.CookieHandler;
       
    35 import java.net.ProxySelector;
       
    36 import java.nio.channels.CancelledKeyException;
       
    37 import java.nio.channels.ClosedChannelException;
       
    38 import java.nio.channels.SelectableChannel;
       
    39 import java.nio.channels.SelectionKey;
       
    40 import java.nio.channels.Selector;
       
    41 import java.nio.channels.SocketChannel;
       
    42 import java.security.AccessControlContext;
       
    43 import java.security.AccessController;
       
    44 import java.security.NoSuchAlgorithmException;
       
    45 import java.security.PrivilegedAction;
       
    46 import java.time.Instant;
       
    47 import java.time.temporal.ChronoUnit;
       
    48 import java.util.ArrayList;
       
    49 import java.util.HashSet;
       
    50 import java.util.Iterator;
       
    51 import java.util.List;
       
    52 import java.util.Optional;
       
    53 import java.util.Set;
       
    54 import java.util.TreeSet;
       
    55 import java.util.concurrent.CompletableFuture;
       
    56 import java.util.concurrent.ExecutionException;
       
    57 import java.util.concurrent.Executor;
       
    58 import java.util.concurrent.Executors;
       
    59 import java.util.concurrent.ThreadFactory;
       
    60 import java.util.concurrent.atomic.AtomicInteger;
       
    61 import java.util.concurrent.atomic.AtomicLong;
       
    62 import java.util.stream.Stream;
       
    63 import jdk.incubator.http.HttpClient;
       
    64 import jdk.incubator.http.HttpRequest;
       
    65 import jdk.incubator.http.HttpResponse;
       
    66 import jdk.incubator.http.HttpResponse.BodyHandler;
       
    67 import jdk.incubator.http.HttpResponse.PushPromiseHandler;
       
    68 import jdk.incubator.http.WebSocket;
       
    69 import jdk.incubator.http.internal.common.Log;
       
    70 import jdk.incubator.http.internal.common.Pair;
       
    71 import jdk.incubator.http.internal.common.Utils;
       
    72 import jdk.incubator.http.internal.websocket.BuilderImpl;
       
    73 import jdk.internal.misc.InnocuousThread;
       
    74 
       
    75 /**
       
    76  * Client implementation. Contains all configuration information and also
       
    77  * the selector manager thread which allows async events to be registered
       
    78  * and delivered when they occur. See AsyncEvent.
       
    79  */
       
    80 class HttpClientImpl extends HttpClient {
       
    81 
       
    82     static final boolean DEBUG = Utils.DEBUG;  // Revisit: temporary dev flag.
       
    83     static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG;  // Revisit: temporary dev flag.
       
    84     static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag.
       
    85     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    86     final System.Logger  debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
       
    87     final System.Logger  debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
       
    88     static final AtomicLong CLIENT_IDS = new AtomicLong();
       
    89 
       
    90     // Define the default factory as a static inner class
       
    91     // that embeds all the necessary logic to avoid
       
    92     // the risk of using a lambda that might keep a reference on the
       
    93     // HttpClient instance from which it was created (helps with
       
    94     // heapdump analysis).
       
    95     private static final class DefaultThreadFactory implements ThreadFactory {
       
    96         private final String namePrefix;
       
    97         private final AtomicInteger nextId = new AtomicInteger();
       
    98 
       
    99         DefaultThreadFactory(long clientID) {
       
   100             namePrefix = "HttpClient-" + clientID + "-Worker-";
       
   101         }
       
   102 
       
   103         @Override
       
   104         public Thread newThread(Runnable r) {
       
   105             String name = namePrefix + nextId.getAndIncrement();
       
   106             Thread t;
       
   107             if (System.getSecurityManager() == null) {
       
   108                 t = new Thread(null, r, name, 0, false);
       
   109             } else {
       
   110                 t = InnocuousThread.newThread(name, r);
       
   111             }
       
   112             t.setDaemon(true);
       
   113             return t;
       
   114         }
       
   115     }
       
   116 
       
   117     private final CookieHandler cookieHandler;
       
   118     private final Redirect followRedirects;
       
   119     private final Optional<ProxySelector> userProxySelector;
       
   120     private final ProxySelector proxySelector;
       
   121     private final Authenticator authenticator;
       
   122     private final Version version;
       
   123     private final ConnectionPool connections;
       
   124     private final Executor executor;
       
   125     private final boolean isDefaultExecutor;
       
   126     // Security parameters
       
   127     private final SSLContext sslContext;
       
   128     private final SSLParameters sslParams;
       
   129     private final SelectorManager selmgr;
       
   130     private final FilterFactory filters;
       
   131     private final Http2ClientImpl client2;
       
   132     private final long id;
       
   133     private final String dbgTag;
       
   134 
       
   135     // This reference is used to keep track of the facade HttpClient
       
   136     // that was returned to the application code.
       
   137     // It makes it possible to know when the application no longer
       
   138     // holds any reference to the HttpClient.
       
   139     // Unfortunately, this information is not enough to know when
       
   140     // to exit the SelectorManager thread. Because of the asynchronous
       
   141     // nature of the API, we also need to wait until all pending operations
       
   142     // have completed.
       
   143     private final WeakReference<HttpClientFacade> facadeRef;
       
   144 
       
   145     // This counter keeps track of the number of operations pending
       
   146     // on the HttpClient. The SelectorManager thread will wait
       
   147     // until there are no longer any pending operations and the
       
   148     // facadeRef is cleared before exiting.
       
   149     //
       
   150     // The pendingOperationCount is incremented every time a send/sendAsync
       
   151     // operation is invoked on the HttpClient, and is decremented when
       
   152     // the HttpResponse<T> object is returned to the user.
       
   153     // However, at this point, the body may not have been fully read yet.
       
   154     // This is the case when the response T is implemented as a streaming
       
   155     // subscriber (such as an InputStream).
       
   156     //
       
   157     // To take care of this issue the pendingOperationCount will additionally
       
   158     // be incremented/decremented in the following cases:
       
   159     //
       
   160     // 1. For HTTP/2  it is incremented when a stream is added to the
       
   161     //    Http2Connection streams map, and decreased when the stream is removed
       
   162     //    from the map. This should also take care of push promises.
       
   163     // 2. For WebSocket the count is increased when creating a
       
   164     //    DetachedConnectionChannel for the socket, and decreased
       
   165     //    when the the channel is closed.
       
   166     //    In addition, the HttpClient facade is passed to the WebSocket builder,
       
   167     //    (instead of the client implementation delegate).
       
   168     // 3. For HTTP/1.1 the count is incremented before starting to parse the body
       
   169     //    response, and decremented when the parser has reached the end of the
       
   170     //    response body flow.
       
   171     //
       
   172     // This should ensure that the selector manager thread remains alive until
       
   173     // the response has been fully received or the web socket is closed.
       
   174     private final AtomicLong pendingOperationCount = new AtomicLong();
       
   175     private final AtomicLong pendingWebSocketCount = new AtomicLong();
       
   176     private final AtomicLong pendingHttpRequestCount = new AtomicLong();
       
   177 
       
   178     /** A Set of, deadline first, ordered timeout events. */
       
   179     private final TreeSet<TimeoutEvent> timeouts;
       
   180 
       
   181     /**
       
   182      * This is a bit tricky:
       
   183      * 1. an HttpClientFacade has a final HttpClientImpl field.
       
   184      * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
       
   185      *    where the referent is the facade created for that instance.
       
   186      * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
       
   187      *    constructor, because it would be only weakly referenced and could
       
   188      *    be GC'ed before we can return it.
       
   189      * The solution is to use an instance of SingleFacadeFactory which will
       
   190      * allow the caller of new HttpClientImpl(...) to retrieve the facade
       
   191      * after the HttpClientImpl has been created.
       
   192      */
       
   193     private static final class SingleFacadeFactory {
       
   194         HttpClientFacade facade;
       
   195         HttpClientFacade createFacade(HttpClientImpl impl) {
       
   196             assert facade == null;
       
   197             return (facade = new HttpClientFacade(impl));
       
   198         }
       
   199     }
       
   200 
       
   201     static HttpClientFacade create(HttpClientBuilderImpl builder) {
       
   202         SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
       
   203         HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
       
   204         impl.start();
       
   205         assert facadeFactory.facade != null;
       
   206         assert impl.facadeRef.get() == facadeFactory.facade;
       
   207         return facadeFactory.facade;
       
   208     }
       
   209 
       
   210     private HttpClientImpl(HttpClientBuilderImpl builder,
       
   211                            SingleFacadeFactory facadeFactory) {
       
   212         id = CLIENT_IDS.incrementAndGet();
       
   213         dbgTag = "HttpClientImpl(" + id +")";
       
   214         if (builder.sslContext == null) {
       
   215             try {
       
   216                 sslContext = SSLContext.getDefault();
       
   217             } catch (NoSuchAlgorithmException ex) {
       
   218                 throw new InternalError(ex);
       
   219             }
       
   220         } else {
       
   221             sslContext = builder.sslContext;
       
   222         }
       
   223         Executor ex = builder.executor;
       
   224         if (ex == null) {
       
   225             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
       
   226             isDefaultExecutor = true;
       
   227         } else {
       
   228             ex = builder.executor;
       
   229             isDefaultExecutor = false;
       
   230         }
       
   231         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
       
   232         client2 = new Http2ClientImpl(this);
       
   233         executor = ex;
       
   234         cookieHandler = builder.cookieHandler;
       
   235         followRedirects = builder.followRedirects == null ?
       
   236                 Redirect.NEVER : builder.followRedirects;
       
   237         this.userProxySelector = Optional.ofNullable(builder.proxy);
       
   238         this.proxySelector = userProxySelector
       
   239                 .orElseGet(HttpClientImpl::getDefaultProxySelector);
       
   240         debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)",
       
   241                 this.proxySelector, userProxySelector.isPresent());
       
   242         authenticator = builder.authenticator;
       
   243         if (builder.version == null) {
       
   244             version = HttpClient.Version.HTTP_2;
       
   245         } else {
       
   246             version = builder.version;
       
   247         }
       
   248         if (builder.sslParams == null) {
       
   249             sslParams = getDefaultParams(sslContext);
       
   250         } else {
       
   251             sslParams = builder.sslParams;
       
   252         }
       
   253         connections = new ConnectionPool(id);
       
   254         connections.start();
       
   255         timeouts = new TreeSet<>();
       
   256         try {
       
   257             selmgr = new SelectorManager(this);
       
   258         } catch (IOException e) {
       
   259             // unlikely
       
   260             throw new InternalError(e);
       
   261         }
       
   262         selmgr.setDaemon(true);
       
   263         filters = new FilterFactory();
       
   264         initFilters();
       
   265         assert facadeRef.get() != null;
       
   266     }
       
   267 
       
   268     private void start() {
       
   269         selmgr.start();
       
   270     }
       
   271 
       
   272     // Called from the SelectorManager thread, just before exiting.
       
   273     // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
       
   274     // that may be still lingering there are properly closed (and their
       
   275     // possibly still opened SocketChannel released).
       
   276     private void stop() {
       
   277         // Clears HTTP/1.1 cache and close its connections
       
   278         connections.stop();
       
   279         // Clears HTTP/2 cache and close its connections.
       
   280         client2.stop();
       
   281     }
       
   282 
       
   283     private static SSLParameters getDefaultParams(SSLContext ctx) {
       
   284         SSLParameters params = ctx.getSupportedSSLParameters();
       
   285         params.setProtocols(new String[]{"TLSv1.2"});
       
   286         return params;
       
   287     }
       
   288 
       
   289     private static ProxySelector getDefaultProxySelector() {
       
   290         PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
       
   291         return AccessController.doPrivileged(action);
       
   292     }
       
   293 
       
   294     // Returns the facade that was returned to the application code.
       
   295     // May be null if that facade is no longer referenced.
       
   296     final HttpClientFacade facade() {
       
   297         return facadeRef.get();
       
   298     }
       
   299 
       
   300     // Increments the pendingOperationCount.
       
   301     final long reference() {
       
   302         pendingHttpRequestCount.incrementAndGet();
       
   303         return pendingOperationCount.incrementAndGet();
       
   304     }
       
   305 
       
   306     // Decrements the pendingOperationCount.
       
   307     final long unreference() {
       
   308         final long count = pendingOperationCount.decrementAndGet();
       
   309         final long httpCount = pendingHttpRequestCount.decrementAndGet();
       
   310         final long webSocketCount = pendingWebSocketCount.get();
       
   311         if (count == 0 && facade() == null) {
       
   312             selmgr.wakeupSelector();
       
   313         }
       
   314         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   315         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   316         assert count >= 0 : "count of pending operations < 0";
       
   317         return count;
       
   318     }
       
   319 
       
   320     // Increments the pendingOperationCount.
       
   321     final long webSocketOpen() {
       
   322         pendingWebSocketCount.incrementAndGet();
       
   323         return pendingOperationCount.incrementAndGet();
       
   324     }
       
   325 
       
   326     // Decrements the pendingOperationCount.
       
   327     final long webSocketClose() {
       
   328         final long count = pendingOperationCount.decrementAndGet();
       
   329         final long webSocketCount = pendingWebSocketCount.decrementAndGet();
       
   330         final long httpCount = pendingHttpRequestCount.get();
       
   331         if (count == 0 && facade() == null) {
       
   332             selmgr.wakeupSelector();
       
   333         }
       
   334         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   335         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   336         assert count >= 0 : "count of pending operations < 0";
       
   337         return count;
       
   338     }
       
   339 
       
   340     // Returns the pendingOperationCount.
       
   341     final long referenceCount() {
       
   342         return pendingOperationCount.get();
       
   343     }
       
   344 
       
   345     // Called by the SelectorManager thread to figure out whether it's time
       
   346     // to terminate.
       
   347     final boolean isReferenced() {
       
   348         HttpClient facade = facade();
       
   349         return facade != null || referenceCount() > 0;
       
   350     }
       
   351 
       
   352     /**
       
   353      * Wait for activity on given exchange.
       
   354      * The following occurs in the SelectorManager thread.
       
   355      *
       
   356      *  1) add to selector
       
   357      *  2) If selector fires for this exchange then
       
   358      *     call AsyncEvent.handle()
       
   359      *
       
   360      * If exchange needs to change interest ops, then call registerEvent() again.
       
   361      */
       
   362     void registerEvent(AsyncEvent exchange) throws IOException {
       
   363         selmgr.register(exchange);
       
   364     }
       
   365 
       
   366     /**
       
   367      * Only used from RawChannel to disconnect the channel from
       
   368      * the selector
       
   369      */
       
   370     void cancelRegistration(SocketChannel s) {
       
   371         selmgr.cancel(s);
       
   372     }
       
   373 
       
   374     /**
       
   375      * Allows an AsyncEvent to modify its interestOps.
       
   376      * @param event The modified event.
       
   377      */
       
   378     void eventUpdated(AsyncEvent event) throws ClosedChannelException {
       
   379         assert !(event instanceof AsyncTriggerEvent);
       
   380         selmgr.eventUpdated(event);
       
   381     }
       
   382 
       
   383     boolean isSelectorThread() {
       
   384         return Thread.currentThread() == selmgr;
       
   385     }
       
   386 
       
   387     Http2ClientImpl client2() {
       
   388         return client2;
       
   389     }
       
   390 
       
   391     private void debugCompleted(String tag, long startNanos, HttpRequest req) {
       
   392         if (debugelapsed.isLoggable(Level.DEBUG)) {
       
   393             debugelapsed.log(Level.DEBUG, () -> tag + " elapsed "
       
   394                     + (System.nanoTime() - startNanos)/1000_000L
       
   395                     + " millis for " + req.method()
       
   396                     + " to " + req.uri());
       
   397         }
       
   398     }
       
   399 
       
   400     @Override
       
   401     public <T> HttpResponse<T>
       
   402     send(HttpRequest req, BodyHandler<T> responseHandler)
       
   403         throws IOException, InterruptedException
       
   404     {
       
   405         try {
       
   406             return sendAsync(req, responseHandler).get();
       
   407         } catch (ExecutionException e) {
       
   408             Throwable t = e.getCause();
       
   409             if (t instanceof Error)
       
   410                 throw (Error)t;
       
   411             if (t instanceof RuntimeException)
       
   412                 throw (RuntimeException)t;
       
   413             else if (t instanceof IOException)
       
   414                 throw Utils.getIOException(t);
       
   415             else
       
   416                 throw new InternalError("Unexpected exception", t);
       
   417         }
       
   418     }
       
   419 
       
   420     @Override
       
   421     public <T> CompletableFuture<HttpResponse<T>>
       
   422     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
       
   423     {
       
   424         return sendAsync(userRequest, responseHandler, null);
       
   425     }
       
   426 
       
   427 
       
   428     @Override
       
   429     public <T> CompletableFuture<HttpResponse<T>>
       
   430     sendAsync(HttpRequest userRequest,
       
   431               BodyHandler<T> responseHandler,
       
   432               PushPromiseHandler<T> pushPromiseHandler)
       
   433     {
       
   434         AccessControlContext acc = null;
       
   435         if (System.getSecurityManager() != null)
       
   436             acc = AccessController.getContext();
       
   437 
       
   438         // Clone the, possibly untrusted, HttpRequest
       
   439         HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
       
   440         if (requestImpl.method().equals("CONNECT"))
       
   441             throw new IllegalArgumentException("Unsupported method CONNECT");
       
   442 
       
   443         long start = DEBUGELAPSED ? System.nanoTime() : 0;
       
   444         reference();
       
   445         try {
       
   446             debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
       
   447 
       
   448             MultiExchange<T> mex = new MultiExchange<>(userRequest,
       
   449                                                             requestImpl,
       
   450                                                             this,
       
   451                                                             responseHandler,
       
   452                                                             pushPromiseHandler,
       
   453                                                             acc);
       
   454             CompletableFuture<HttpResponse<T>> res =
       
   455                     mex.responseAsync().whenComplete((b,t) -> unreference());
       
   456             if (DEBUGELAPSED) {
       
   457                 res = res.whenComplete(
       
   458                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
       
   459             }
       
   460             // makes sure that any dependent actions happen in the executor
       
   461             if (acc != null) {
       
   462                 res.whenCompleteAsync((r, t) -> { /* do nothing */},
       
   463                                       new PrivilegedExecutor(executor, acc));
       
   464             }
       
   465 
       
   466             return res;
       
   467         } catch(Throwable t) {
       
   468             unreference();
       
   469             debugCompleted("ClientImpl (async)", start, userRequest);
       
   470             throw t;
       
   471         }
       
   472     }
       
   473 
       
   474     // Main loop for this client's selector
       
   475     private final static class SelectorManager extends Thread {
       
   476 
       
   477         // For testing purposes we have an internal System property that
       
   478         // can control the frequency at which the selector manager will wake
       
   479         // up when there are no pending operations.
       
   480         // Increasing the frequency (shorter delays) might allow the selector
       
   481         // to observe that the facade is no longer referenced and might allow
       
   482         // the selector thread to terminate more timely - for when nothing is
       
   483         // ongoing it will only check for that condition every NODEADLINE ms.
       
   484         // To avoid misuse of the property, the delay that can be specified
       
   485         // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
       
   486         // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
       
   487         // The property is -Djdk.httpclient.internal.selector.timeout=<millis>
       
   488         private static final int MIN_NODEADLINE = 1000; // ms
       
   489         private static final int MAX_NODEADLINE = 1000 * 1200; // ms
       
   490         private static final int DEF_NODEADLINE = 3000; // ms
       
   491         private static final long NODEADLINE; // default is DEF_NODEADLINE ms
       
   492         static {
       
   493             // ensure NODEADLINE is initialized with some valid value.
       
   494             long deadline =  Utils.getIntegerNetProperty(
       
   495                 "jdk.httpclient.internal.selector.timeout",
       
   496                 DEF_NODEADLINE); // millis
       
   497             if (deadline <= 0) deadline = DEF_NODEADLINE;
       
   498             deadline = Math.max(deadline, MIN_NODEADLINE);
       
   499             NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
       
   500         }
       
   501 
       
   502         private final Selector selector;
       
   503         private volatile boolean closed;
       
   504         private final List<AsyncEvent> registrations;
       
   505         private final System.Logger debug;
       
   506         private final System.Logger debugtimeout;
       
   507         HttpClientImpl owner;
       
   508         ConnectionPool pool;
       
   509 
       
   510         SelectorManager(HttpClientImpl ref) throws IOException {
       
   511             super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false);
       
   512             owner = ref;
       
   513             debug = ref.debug;
       
   514             debugtimeout = ref.debugtimeout;
       
   515             pool = ref.connectionPool();
       
   516             registrations = new ArrayList<>();
       
   517             selector = Selector.open();
       
   518         }
       
   519 
       
   520         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
       
   521             if (Thread.currentThread() == this) {
       
   522                 SelectionKey key = e.channel().keyFor(selector);
       
   523                 SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   524                 if (sa != null) sa.register(e);
       
   525             } else {
       
   526                 register(e);
       
   527             }
       
   528         }
       
   529 
       
   530         // This returns immediately. So caller not allowed to send/receive
       
   531         // on connection.
       
   532         synchronized void register(AsyncEvent e) {
       
   533             registrations.add(e);
       
   534             selector.wakeup();
       
   535         }
       
   536 
       
   537         synchronized void cancel(SocketChannel e) {
       
   538             SelectionKey key = e.keyFor(selector);
       
   539             if (key != null) {
       
   540                 key.cancel();
       
   541             }
       
   542             selector.wakeup();
       
   543         }
       
   544 
       
   545         void wakeupSelector() {
       
   546             selector.wakeup();
       
   547         }
       
   548 
       
   549         synchronized void shutdown() {
       
   550             debug.log(Level.DEBUG, "SelectorManager shutting down");
       
   551             closed = true;
       
   552             try {
       
   553                 selector.close();
       
   554             } catch (IOException ignored) {
       
   555             } finally {
       
   556                 owner.stop();
       
   557             }
       
   558         }
       
   559 
       
   560         @Override
       
   561         public void run() {
       
   562             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
       
   563             List<AsyncEvent> readyList = new ArrayList<>();
       
   564             try {
       
   565                 while (!Thread.currentThread().isInterrupted()) {
       
   566                     synchronized (this) {
       
   567                         assert errorList.isEmpty();
       
   568                         assert readyList.isEmpty();
       
   569                         for (AsyncEvent event : registrations) {
       
   570                             if (event instanceof AsyncTriggerEvent) {
       
   571                                 readyList.add(event);
       
   572                                 continue;
       
   573                             }
       
   574                             SelectableChannel chan = event.channel();
       
   575                             SelectionKey key = null;
       
   576                             try {
       
   577                                 key = chan.keyFor(selector);
       
   578                                 SelectorAttachment sa;
       
   579                                 if (key == null || !key.isValid()) {
       
   580                                     if (key != null) {
       
   581                                         // key is canceled.
       
   582                                         // invoke selectNow() to purge it
       
   583                                         // before registering the new event.
       
   584                                         selector.selectNow();
       
   585                                     }
       
   586                                     sa = new SelectorAttachment(chan, selector);
       
   587                                 } else {
       
   588                                     sa = (SelectorAttachment) key.attachment();
       
   589                                 }
       
   590                                 // may throw IOE if channel closed: that's OK
       
   591                                 sa.register(event);
       
   592                                 if (!chan.isOpen()) {
       
   593                                     throw new IOException("Channel closed");
       
   594                                 }
       
   595                             } catch (IOException e) {
       
   596                                 Log.logTrace("HttpClientImpl: " + e);
       
   597                                 debug.log(Level.DEBUG, () ->
       
   598                                         "Got " + e.getClass().getName()
       
   599                                                  + " while handling"
       
   600                                                  + " registration events");
       
   601                                 chan.close();
       
   602                                 // let the event abort deal with it
       
   603                                 errorList.add(new Pair<>(event, e));
       
   604                                 if (key != null) {
       
   605                                     key.cancel();
       
   606                                     selector.selectNow();
       
   607                                 }
       
   608                             }
       
   609                         }
       
   610                         registrations.clear();
       
   611                         selector.selectedKeys().clear();
       
   612                     }
       
   613 
       
   614                     for (AsyncEvent event : readyList) {
       
   615                         assert event instanceof AsyncTriggerEvent;
       
   616                         event.handle();
       
   617                     }
       
   618                     readyList.clear();
       
   619 
       
   620                     for (Pair<AsyncEvent,IOException> error : errorList) {
       
   621                         // an IOException was raised and the channel closed.
       
   622                         handleEvent(error.first, error.second);
       
   623                     }
       
   624                     errorList.clear();
       
   625 
       
   626                     // Check whether client is still alive, and if not,
       
   627                     // gracefully stop this thread
       
   628                     if (!owner.isReferenced()) {
       
   629                         Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   630                         return;
       
   631                     }
       
   632 
       
   633                     // Timeouts will have milliseconds granularity. It is important
       
   634                     // to handle them in a timely fashion.
       
   635                     long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
       
   636                     debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout);
       
   637 
       
   638                     // Keep-alive have seconds granularity. It's not really an
       
   639                     // issue if we keep connections linger a bit more in the keep
       
   640                     // alive cache.
       
   641                     long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
       
   642                     debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry);
       
   643 
       
   644                     assert nextTimeout >= 0;
       
   645                     assert nextExpiry >= 0;
       
   646 
       
   647                     // Don't wait for ever as it might prevent the thread to
       
   648                     // stop gracefully. millis will be 0 if no deadline was found.
       
   649                     if (nextTimeout <= 0) nextTimeout = NODEADLINE;
       
   650 
       
   651                     // Clip nextExpiry at NODEADLINE limit. The default
       
   652                     // keep alive is 1200 seconds (half an hour) - we don't
       
   653                     // want to wait that long.
       
   654                     if (nextExpiry <= 0) nextExpiry = NODEADLINE;
       
   655                     else nextExpiry = Math.min(NODEADLINE, nextExpiry);
       
   656 
       
   657                     // takes the least of the two.
       
   658                     long millis = Math.min(nextExpiry, nextTimeout);
       
   659 
       
   660                     debugtimeout.log(Level.DEBUG, "Next deadline is %d",
       
   661                                      (millis == 0 ? NODEADLINE : millis));
       
   662                     //debugPrint(selector);
       
   663                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
       
   664                     if (n == 0) {
       
   665                         // Check whether client is still alive, and if not,
       
   666                         // gracefully stop this thread
       
   667                         if (!owner.isReferenced()) {
       
   668                             Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   669                             return;
       
   670                         }
       
   671                         owner.purgeTimeoutsAndReturnNextDeadline();
       
   672                         continue;
       
   673                     }
       
   674                     Set<SelectionKey> keys = selector.selectedKeys();
       
   675 
       
   676                     assert errorList.isEmpty();
       
   677                     for (SelectionKey key : keys) {
       
   678                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   679                         if (!key.isValid()) {
       
   680                             IOException ex = sa.chan.isOpen()
       
   681                                     ? new IOException("Invalid key")
       
   682                                     : new ClosedChannelException();
       
   683                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
       
   684                             sa.pending.clear();
       
   685                             continue;
       
   686                         }
       
   687 
       
   688                         int eventsOccurred;
       
   689                         try {
       
   690                             eventsOccurred = key.readyOps();
       
   691                         } catch (CancelledKeyException ex) {
       
   692                             IOException io = Utils.getIOException(ex);
       
   693                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
       
   694                             sa.pending.clear();
       
   695                             continue;
       
   696                         }
       
   697                         sa.events(eventsOccurred).forEach(readyList::add);
       
   698                         sa.resetInterestOps(eventsOccurred);
       
   699                     }
       
   700                     selector.selectNow(); // complete cancellation
       
   701                     selector.selectedKeys().clear();
       
   702 
       
   703                     for (AsyncEvent event : readyList) {
       
   704                         handleEvent(event, null); // will be delegated to executor
       
   705                     }
       
   706                     readyList.clear();
       
   707                     errorList.forEach((p) -> handleEvent(p.first, p.second));
       
   708                     errorList.clear();
       
   709                 }
       
   710             } catch (Throwable e) {
       
   711                 //e.printStackTrace();
       
   712                 if (!closed) {
       
   713                     // This terminates thread. So, better just print stack trace
       
   714                     String err = Utils.stackTrace(e);
       
   715                     Log.logError("HttpClientImpl: fatal error: " + err);
       
   716                 }
       
   717                 debug.log(Level.DEBUG, "shutting down", e);
       
   718                 if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) {
       
   719                     e.printStackTrace(System.err); // always print the stack
       
   720                 }
       
   721             } finally {
       
   722                 shutdown();
       
   723             }
       
   724         }
       
   725 
       
   726 //        void debugPrint(Selector selector) {
       
   727 //            System.err.println("Selector: debugprint start");
       
   728 //            Set<SelectionKey> keys = selector.keys();
       
   729 //            for (SelectionKey key : keys) {
       
   730 //                SelectableChannel c = key.channel();
       
   731 //                int ops = key.interestOps();
       
   732 //                System.err.printf("selector chan:%s ops:%d\n", c, ops);
       
   733 //            }
       
   734 //            System.err.println("Selector: debugprint end");
       
   735 //        }
       
   736 
       
   737         /** Handles the given event. The given ioe may be null. */
       
   738         void handleEvent(AsyncEvent event, IOException ioe) {
       
   739             if (closed || ioe != null) {
       
   740                 event.abort(ioe);
       
   741             } else {
       
   742                 event.handle();
       
   743             }
       
   744         }
       
   745     }
       
   746 
       
   747     /**
       
   748      * Tracks multiple user level registrations associated with one NIO
       
   749      * registration (SelectionKey). In this implementation, registrations
       
   750      * are one-off and when an event is posted the registration is cancelled
       
   751      * until explicitly registered again.
       
   752      *
       
   753      * <p> No external synchronization required as this class is only used
       
   754      * by the SelectorManager thread. One of these objects required per
       
   755      * connection.
       
   756      */
       
   757     private static class SelectorAttachment {
       
   758         private final SelectableChannel chan;
       
   759         private final Selector selector;
       
   760         private final Set<AsyncEvent> pending;
       
   761         private final static System.Logger debug =
       
   762                 Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG);
       
   763         private int interestOps;
       
   764 
       
   765         SelectorAttachment(SelectableChannel chan, Selector selector) {
       
   766             this.pending = new HashSet<>();
       
   767             this.chan = chan;
       
   768             this.selector = selector;
       
   769         }
       
   770 
       
   771         void register(AsyncEvent e) throws ClosedChannelException {
       
   772             int newOps = e.interestOps();
       
   773             boolean reRegister = (interestOps & newOps) != newOps;
       
   774             interestOps |= newOps;
       
   775             pending.add(e);
       
   776             if (reRegister) {
       
   777                 // first time registration happens here also
       
   778                 chan.register(selector, interestOps, this);
       
   779             }
       
   780         }
       
   781 
       
   782         /**
       
   783          * Returns a Stream<AsyncEvents> containing only events that are
       
   784          * registered with the given {@code interestOps}.
       
   785          */
       
   786         Stream<AsyncEvent> events(int interestOps) {
       
   787             return pending.stream()
       
   788                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
       
   789         }
       
   790 
       
   791         /**
       
   792          * Removes any events with the given {@code interestOps}, and if no
       
   793          * events remaining, cancels the associated SelectionKey.
       
   794          */
       
   795         void resetInterestOps(int interestOps) {
       
   796             int newOps = 0;
       
   797 
       
   798             Iterator<AsyncEvent> itr = pending.iterator();
       
   799             while (itr.hasNext()) {
       
   800                 AsyncEvent event = itr.next();
       
   801                 int evops = event.interestOps();
       
   802                 if (event.repeating()) {
       
   803                     newOps |= evops;
       
   804                     continue;
       
   805                 }
       
   806                 if ((evops & interestOps) != 0) {
       
   807                     itr.remove();
       
   808                 } else {
       
   809                     newOps |= evops;
       
   810                 }
       
   811             }
       
   812 
       
   813             this.interestOps = newOps;
       
   814             SelectionKey key = chan.keyFor(selector);
       
   815             if (newOps == 0 && pending.isEmpty()) {
       
   816                 key.cancel();
       
   817             } else {
       
   818                 try {
       
   819                     key.interestOps(newOps);
       
   820                 } catch (CancelledKeyException x) {
       
   821                     // channel may have been closed
       
   822                     debug.log(Level.DEBUG, "key cancelled for " + chan);
       
   823                     abortPending(x);
       
   824                 }
       
   825             }
       
   826         }
       
   827 
       
   828         void abortPending(Throwable x) {
       
   829             if (!pending.isEmpty()) {
       
   830                 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
       
   831                 pending.clear();
       
   832                 IOException io = Utils.getIOException(x);
       
   833                 for (AsyncEvent event : evts) {
       
   834                     event.abort(io);
       
   835                 }
       
   836             }
       
   837         }
       
   838     }
       
   839 
       
   840     /*package-private*/ SSLContext theSSLContext() {
       
   841         return sslContext;
       
   842     }
       
   843 
       
   844     @Override
       
   845     public SSLContext sslContext() {
       
   846         return sslContext;
       
   847     }
       
   848 
       
   849     @Override
       
   850     public SSLParameters sslParameters() {
       
   851         return Utils.copySSLParameters(sslParams);
       
   852     }
       
   853 
       
   854     @Override
       
   855     public Optional<Authenticator> authenticator() {
       
   856         return Optional.ofNullable(authenticator);
       
   857     }
       
   858 
       
   859     /*package-private*/ final Executor theExecutor() {
       
   860         return executor;
       
   861     }
       
   862 
       
   863     @Override
       
   864     public final Optional<Executor> executor() {
       
   865         return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
       
   866     }
       
   867 
       
   868     ConnectionPool connectionPool() {
       
   869         return connections;
       
   870     }
       
   871 
       
   872     @Override
       
   873     public Redirect followRedirects() {
       
   874         return followRedirects;
       
   875     }
       
   876 
       
   877 
       
   878     @Override
       
   879     public Optional<CookieHandler> cookieHandler() {
       
   880         return Optional.ofNullable(cookieHandler);
       
   881     }
       
   882 
       
   883     @Override
       
   884     public Optional<ProxySelector> proxy() {
       
   885         return this.userProxySelector;
       
   886     }
       
   887 
       
   888     // Return the effective proxy that this client uses.
       
   889     ProxySelector proxySelector() {
       
   890         return proxySelector;
       
   891     }
       
   892 
       
   893     @Override
       
   894     public WebSocket.Builder newWebSocketBuilder() {
       
   895         // Make sure to pass the HttpClientFacade to the WebSocket builder.
       
   896         // This will ensure that the facade is not released before the
       
   897         // WebSocket has been created, at which point the pendingOperationCount
       
   898         // will have been incremented by the DetachedConnectionChannel
       
   899         // (see PlainHttpConnection.detachChannel())
       
   900         return new BuilderImpl(this.facade(), proxySelector);
       
   901     }
       
   902 
       
   903     @Override
       
   904     public Version version() {
       
   905         return version;
       
   906     }
       
   907 
       
   908     String dbgString() {
       
   909         return dbgTag;
       
   910     }
       
   911 
       
   912     @Override
       
   913     public String toString() {
       
   914         // Used by tests to get the client's id and compute the
       
   915         // name of the SelectorManager thread.
       
   916         return super.toString() + ("(" + id + ")");
       
   917     }
       
   918 
       
   919     private void initFilters() {
       
   920         addFilter(AuthenticationFilter.class);
       
   921         addFilter(RedirectFilter.class);
       
   922         if (this.cookieHandler != null) {
       
   923             addFilter(CookieFilter.class);
       
   924         }
       
   925     }
       
   926 
       
   927     private void addFilter(Class<? extends HeaderFilter> f) {
       
   928         filters.addFilter(f);
       
   929     }
       
   930 
       
   931     final List<HeaderFilter> filterChain() {
       
   932         return filters.getFilterChain();
       
   933     }
       
   934 
       
   935     // Timer controls.
       
   936     // Timers are implemented through timed Selector.select() calls.
       
   937 
       
   938     synchronized void registerTimer(TimeoutEvent event) {
       
   939         Log.logTrace("Registering timer {0}", event);
       
   940         timeouts.add(event);
       
   941         selmgr.wakeupSelector();
       
   942     }
       
   943 
       
   944     synchronized void cancelTimer(TimeoutEvent event) {
       
   945         Log.logTrace("Canceling timer {0}", event);
       
   946         timeouts.remove(event);
       
   947     }
       
   948 
       
   949     /**
       
   950      * Purges ( handles ) timer events that have passed their deadline, and
       
   951      * returns the amount of time, in milliseconds, until the next earliest
       
   952      * event. A return value of 0 means that there are no events.
       
   953      */
       
   954     private long purgeTimeoutsAndReturnNextDeadline() {
       
   955         long diff = 0L;
       
   956         List<TimeoutEvent> toHandle = null;
       
   957         int remaining = 0;
       
   958         // enter critical section to retrieve the timeout event to handle
       
   959         synchronized(this) {
       
   960             if (timeouts.isEmpty()) return 0L;
       
   961 
       
   962             Instant now = Instant.now();
       
   963             Iterator<TimeoutEvent> itr = timeouts.iterator();
       
   964             while (itr.hasNext()) {
       
   965                 TimeoutEvent event = itr.next();
       
   966                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
       
   967                 if (diff <= 0) {
       
   968                     itr.remove();
       
   969                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
       
   970                     toHandle.add(event);
       
   971                 } else {
       
   972                     break;
       
   973                 }
       
   974             }
       
   975             remaining = timeouts.size();
       
   976         }
       
   977 
       
   978         // can be useful for debugging
       
   979         if (toHandle != null && Log.trace()) {
       
   980             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
       
   981                     +  toHandle.size() + " events, "
       
   982                     + "remaining " + remaining
       
   983                     + ", next deadline: " + (diff < 0 ? 0L : diff));
       
   984         }
       
   985 
       
   986         // handle timeout events out of critical section
       
   987         if (toHandle != null) {
       
   988             Throwable failed = null;
       
   989             for (TimeoutEvent event : toHandle) {
       
   990                 try {
       
   991                    Log.logTrace("Firing timer {0}", event);
       
   992                    event.handle();
       
   993                 } catch (Error | RuntimeException e) {
       
   994                     // Not expected. Handle remaining events then throw...
       
   995                     // If e is an OOME or SOE it might simply trigger a new
       
   996                     // error from here - but in this case there's not much we
       
   997                     // could do anyway. Just let it flow...
       
   998                     if (failed == null) failed = e;
       
   999                     else failed.addSuppressed(e);
       
  1000                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
       
  1001                 }
       
  1002             }
       
  1003             if (failed instanceof Error) throw (Error) failed;
       
  1004             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
       
  1005         }
       
  1006 
       
  1007         // return time to wait until next event. 0L if there's no more events.
       
  1008         return diff < 0 ? 0L : diff;
       
  1009     }
       
  1010 
       
  1011     // used for the connection window
       
  1012     int getReceiveBufferSize() {
       
  1013         return Utils.getIntegerNetProperty(
       
  1014                 "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024
       
  1015         );
       
  1016     }
       
  1017 }