src/java.net.http/share/classes/java/net/http/internal/HttpClientImpl.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56081 20c6742e5545
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal;
       
    27 
       
    28 import javax.net.ssl.SSLContext;
       
    29 import javax.net.ssl.SSLParameters;
       
    30 import java.io.IOException;
       
    31 import java.lang.System.Logger.Level;
       
    32 import java.lang.ref.WeakReference;
       
    33 import java.net.Authenticator;
       
    34 import java.net.CookieHandler;
       
    35 import java.net.ProxySelector;
       
    36 import java.nio.channels.CancelledKeyException;
       
    37 import java.nio.channels.ClosedChannelException;
       
    38 import java.nio.channels.SelectableChannel;
       
    39 import java.nio.channels.SelectionKey;
       
    40 import java.nio.channels.Selector;
       
    41 import java.nio.channels.SocketChannel;
       
    42 import java.security.AccessControlContext;
       
    43 import java.security.AccessController;
       
    44 import java.security.NoSuchAlgorithmException;
       
    45 import java.security.PrivilegedAction;
       
    46 import java.time.Instant;
       
    47 import java.time.temporal.ChronoUnit;
       
    48 import java.util.ArrayList;
       
    49 import java.util.HashSet;
       
    50 import java.util.Iterator;
       
    51 import java.util.List;
       
    52 import java.util.Optional;
       
    53 import java.util.Set;
       
    54 import java.util.TreeSet;
       
    55 import java.util.concurrent.CompletableFuture;
       
    56 import java.util.concurrent.ExecutionException;
       
    57 import java.util.concurrent.Executor;
       
    58 import java.util.concurrent.Executors;
       
    59 import java.util.concurrent.ThreadFactory;
       
    60 import java.util.concurrent.atomic.AtomicInteger;
       
    61 import java.util.concurrent.atomic.AtomicLong;
       
    62 import java.util.stream.Stream;
       
    63 import java.net.http.HttpClient;
       
    64 import java.net.http.HttpRequest;
       
    65 import java.net.http.HttpResponse;
       
    66 import java.net.http.HttpResponse.BodyHandler;
       
    67 import java.net.http.HttpResponse.PushPromiseHandler;
       
    68 import java.net.http.WebSocket;
       
    69 import java.net.http.internal.common.Log;
       
    70 import java.net.http.internal.common.Pair;
       
    71 import java.net.http.internal.common.Utils;
       
    72 import java.net.http.internal.websocket.BuilderImpl;
       
    73 import jdk.internal.misc.InnocuousThread;
       
    74 
       
    75 /**
       
    76  * Client implementation. Contains all configuration information and also
       
    77  * the selector manager thread which allows async events to be registered
       
    78  * and delivered when they occur. See AsyncEvent.
       
    79  */
       
    80 class HttpClientImpl extends HttpClient {
       
    81 
       
    82     static final boolean DEBUG = Utils.DEBUG;  // Revisit: temporary dev flag.
       
    83     static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG;  // Revisit: temporary dev flag.
       
    84     static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag.
       
    85     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    86     final System.Logger  debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
       
    87     final System.Logger  debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
       
    88     static final AtomicLong CLIENT_IDS = new AtomicLong();
       
    89 
       
    90     // Define the default factory as a static inner class
       
    91     // that embeds all the necessary logic to avoid
       
    92     // the risk of using a lambda that might keep a reference on the
       
    93     // HttpClient instance from which it was created (helps with
       
    94     // heapdump analysis).
       
    95     private static final class DefaultThreadFactory implements ThreadFactory {
       
    96         private final String namePrefix;
       
    97         private final AtomicInteger nextId = new AtomicInteger();
       
    98 
       
    99         DefaultThreadFactory(long clientID) {
       
   100             namePrefix = "HttpClient-" + clientID + "-Worker-";
       
   101         }
       
   102 
       
   103         @Override
       
   104         public Thread newThread(Runnable r) {
       
   105             String name = namePrefix + nextId.getAndIncrement();
       
   106             Thread t;
       
   107             if (System.getSecurityManager() == null) {
       
   108                 t = new Thread(null, r, name, 0, false);
       
   109             } else {
       
   110                 t = InnocuousThread.newThread(name, r);
       
   111             }
       
   112             t.setDaemon(true);
       
   113             return t;
       
   114         }
       
   115     }
       
   116 
       
   117     private final CookieHandler cookieHandler;
       
   118     private final Redirect followRedirects;
       
   119     private final Optional<ProxySelector> userProxySelector;
       
   120     private final ProxySelector proxySelector;
       
   121     private final Authenticator authenticator;
       
   122     private final Version version;
       
   123     private final ConnectionPool connections;
       
   124     private final Executor executor;
       
   125     private final boolean isDefaultExecutor;
       
   126     // Security parameters
       
   127     private final SSLContext sslContext;
       
   128     private final SSLParameters sslParams;
       
   129     private final SelectorManager selmgr;
       
   130     private final FilterFactory filters;
       
   131     private final Http2ClientImpl client2;
       
   132     private final long id;
       
   133     private final String dbgTag;
       
   134 
       
   135     // This reference is used to keep track of the facade HttpClient
       
   136     // that was returned to the application code.
       
   137     // It makes it possible to know when the application no longer
       
   138     // holds any reference to the HttpClient.
       
   139     // Unfortunately, this information is not enough to know when
       
   140     // to exit the SelectorManager thread. Because of the asynchronous
       
   141     // nature of the API, we also need to wait until all pending operations
       
   142     // have completed.
       
   143     private final WeakReference<HttpClientFacade> facadeRef;
       
   144 
       
   145     // This counter keeps track of the number of operations pending
       
   146     // on the HttpClient. The SelectorManager thread will wait
       
   147     // until there are no longer any pending operations and the
       
   148     // facadeRef is cleared before exiting.
       
   149     //
       
   150     // The pendingOperationCount is incremented every time a send/sendAsync
       
   151     // operation is invoked on the HttpClient, and is decremented when
       
   152     // the HttpResponse<T> object is returned to the user.
       
   153     // However, at this point, the body may not have been fully read yet.
       
   154     // This is the case when the response T is implemented as a streaming
       
   155     // subscriber (such as an InputStream).
       
   156     //
       
   157     // To take care of this issue the pendingOperationCount will additionally
       
   158     // be incremented/decremented in the following cases:
       
   159     //
       
   160     // 1. For HTTP/2  it is incremented when a stream is added to the
       
   161     //    Http2Connection streams map, and decreased when the stream is removed
       
   162     //    from the map. This should also take care of push promises.
       
   163     // 2. For WebSocket the count is increased when creating a
       
   164     //    DetachedConnectionChannel for the socket, and decreased
       
   165     //    when the the channel is closed.
       
   166     //    In addition, the HttpClient facade is passed to the WebSocket builder,
       
   167     //    (instead of the client implementation delegate).
       
   168     // 3. For HTTP/1.1 the count is incremented before starting to parse the body
       
   169     //    response, and decremented when the parser has reached the end of the
       
   170     //    response body flow.
       
   171     //
       
   172     // This should ensure that the selector manager thread remains alive until
       
   173     // the response has been fully received or the web socket is closed.
       
   174     private final AtomicLong pendingOperationCount = new AtomicLong();
       
   175     private final AtomicLong pendingWebSocketCount = new AtomicLong();
       
   176     private final AtomicLong pendingHttpRequestCount = new AtomicLong();
       
   177 
       
   178     /** A Set of, deadline first, ordered timeout events. */
       
   179     private final TreeSet<TimeoutEvent> timeouts;
       
   180 
       
   181     /**
       
   182      * This is a bit tricky:
       
   183      * 1. an HttpClientFacade has a final HttpClientImpl field.
       
   184      * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
       
   185      *    where the referent is the facade created for that instance.
       
   186      * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
       
   187      *    constructor, because it would be only weakly referenced and could
       
   188      *    be GC'ed before we can return it.
       
   189      * The solution is to use an instance of SingleFacadeFactory which will
       
   190      * allow the caller of new HttpClientImpl(...) to retrieve the facade
       
   191      * after the HttpClientImpl has been created.
       
   192      */
       
   193     private static final class SingleFacadeFactory {
       
   194         HttpClientFacade facade;
       
   195         HttpClientFacade createFacade(HttpClientImpl impl) {
       
   196             assert facade == null;
       
   197             return (facade = new HttpClientFacade(impl));
       
   198         }
       
   199     }
       
   200 
       
   201     static HttpClientFacade create(HttpClientBuilderImpl builder) {
       
   202         SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
       
   203         HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
       
   204         impl.start();
       
   205         assert facadeFactory.facade != null;
       
   206         assert impl.facadeRef.get() == facadeFactory.facade;
       
   207         return facadeFactory.facade;
       
   208     }
       
   209 
       
   210     private HttpClientImpl(HttpClientBuilderImpl builder,
       
   211                            SingleFacadeFactory facadeFactory) {
       
   212         id = CLIENT_IDS.incrementAndGet();
       
   213         dbgTag = "HttpClientImpl(" + id +")";
       
   214         if (builder.sslContext == null) {
       
   215             try {
       
   216                 sslContext = SSLContext.getDefault();
       
   217             } catch (NoSuchAlgorithmException ex) {
       
   218                 throw new InternalError(ex);
       
   219             }
       
   220         } else {
       
   221             sslContext = builder.sslContext;
       
   222         }
       
   223         Executor ex = builder.executor;
       
   224         if (ex == null) {
       
   225             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
       
   226             isDefaultExecutor = true;
       
   227         } else {
       
   228             ex = builder.executor;
       
   229             isDefaultExecutor = false;
       
   230         }
       
   231         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
       
   232         client2 = new Http2ClientImpl(this);
       
   233         executor = ex;
       
   234         cookieHandler = builder.cookieHandler;
       
   235         followRedirects = builder.followRedirects == null ?
       
   236                 Redirect.NEVER : builder.followRedirects;
       
   237         this.userProxySelector = Optional.ofNullable(builder.proxy);
       
   238         this.proxySelector = userProxySelector
       
   239                 .orElseGet(HttpClientImpl::getDefaultProxySelector);
       
   240         debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)",
       
   241                 this.proxySelector, userProxySelector.isPresent());
       
   242         authenticator = builder.authenticator;
       
   243         if (builder.version == null) {
       
   244             version = HttpClient.Version.HTTP_2;
       
   245         } else {
       
   246             version = builder.version;
       
   247         }
       
   248         if (builder.sslParams == null) {
       
   249             sslParams = getDefaultParams(sslContext);
       
   250         } else {
       
   251             sslParams = builder.sslParams;
       
   252         }
       
   253         connections = new ConnectionPool(id);
       
   254         connections.start();
       
   255         timeouts = new TreeSet<>();
       
   256         try {
       
   257             selmgr = new SelectorManager(this);
       
   258         } catch (IOException e) {
       
   259             // unlikely
       
   260             throw new InternalError(e);
       
   261         }
       
   262         selmgr.setDaemon(true);
       
   263         filters = new FilterFactory();
       
   264         initFilters();
       
   265         assert facadeRef.get() != null;
       
   266     }
       
   267 
       
   268     private void start() {
       
   269         selmgr.start();
       
   270     }
       
   271 
       
   272     // Called from the SelectorManager thread, just before exiting.
       
   273     // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
       
   274     // that may be still lingering there are properly closed (and their
       
   275     // possibly still opened SocketChannel released).
       
   276     private void stop() {
       
   277         // Clears HTTP/1.1 cache and close its connections
       
   278         connections.stop();
       
   279         // Clears HTTP/2 cache and close its connections.
       
   280         client2.stop();
       
   281     }
       
   282 
       
   283     private static SSLParameters getDefaultParams(SSLContext ctx) {
       
   284         SSLParameters params = ctx.getSupportedSSLParameters();
       
   285         params.setProtocols(new String[]{"TLSv1.2"});
       
   286         return params;
       
   287     }
       
   288 
       
   289     private static ProxySelector getDefaultProxySelector() {
       
   290         PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
       
   291         return AccessController.doPrivileged(action);
       
   292     }
       
   293 
       
   294     // Returns the facade that was returned to the application code.
       
   295     // May be null if that facade is no longer referenced.
       
   296     final HttpClientFacade facade() {
       
   297         return facadeRef.get();
       
   298     }
       
   299 
       
   300     // Increments the pendingOperationCount.
       
   301     final long reference() {
       
   302         pendingHttpRequestCount.incrementAndGet();
       
   303         return pendingOperationCount.incrementAndGet();
       
   304     }
       
   305 
       
   306     // Decrements the pendingOperationCount.
       
   307     final long unreference() {
       
   308         final long count = pendingOperationCount.decrementAndGet();
       
   309         final long httpCount = pendingHttpRequestCount.decrementAndGet();
       
   310         final long webSocketCount = pendingWebSocketCount.get();
       
   311         if (count == 0 && facade() == null) {
       
   312             selmgr.wakeupSelector();
       
   313         }
       
   314         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   315         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   316         assert count >= 0 : "count of pending operations < 0";
       
   317         return count;
       
   318     }
       
   319 
       
   320     // Increments the pendingOperationCount.
       
   321     final long webSocketOpen() {
       
   322         pendingWebSocketCount.incrementAndGet();
       
   323         return pendingOperationCount.incrementAndGet();
       
   324     }
       
   325 
       
   326     // Decrements the pendingOperationCount.
       
   327     final long webSocketClose() {
       
   328         final long count = pendingOperationCount.decrementAndGet();
       
   329         final long webSocketCount = pendingWebSocketCount.decrementAndGet();
       
   330         final long httpCount = pendingHttpRequestCount.get();
       
   331         if (count == 0 && facade() == null) {
       
   332             selmgr.wakeupSelector();
       
   333         }
       
   334         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   335         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   336         assert count >= 0 : "count of pending operations < 0";
       
   337         return count;
       
   338     }
       
   339 
       
   340     // Returns the pendingOperationCount.
       
   341     final long referenceCount() {
       
   342         return pendingOperationCount.get();
       
   343     }
       
   344 
       
   345     // Called by the SelectorManager thread to figure out whether it's time
       
   346     // to terminate.
       
   347     final boolean isReferenced() {
       
   348         HttpClient facade = facade();
       
   349         return facade != null || referenceCount() > 0;
       
   350     }
       
   351 
       
   352     /**
       
   353      * Wait for activity on given exchange.
       
   354      * The following occurs in the SelectorManager thread.
       
   355      *
       
   356      *  1) add to selector
       
   357      *  2) If selector fires for this exchange then
       
   358      *     call AsyncEvent.handle()
       
   359      *
       
   360      * If exchange needs to change interest ops, then call registerEvent() again.
       
   361      */
       
   362     void registerEvent(AsyncEvent exchange) throws IOException {
       
   363         selmgr.register(exchange);
       
   364     }
       
   365 
       
   366     /**
       
   367      * Only used from RawChannel to disconnect the channel from
       
   368      * the selector
       
   369      */
       
   370     void cancelRegistration(SocketChannel s) {
       
   371         selmgr.cancel(s);
       
   372     }
       
   373 
       
   374     /**
       
   375      * Allows an AsyncEvent to modify its interestOps.
       
   376      * @param event The modified event.
       
   377      */
       
   378     void eventUpdated(AsyncEvent event) throws ClosedChannelException {
       
   379         assert !(event instanceof AsyncTriggerEvent);
       
   380         selmgr.eventUpdated(event);
       
   381     }
       
   382 
       
   383     boolean isSelectorThread() {
       
   384         return Thread.currentThread() == selmgr;
       
   385     }
       
   386 
       
   387     Http2ClientImpl client2() {
       
   388         return client2;
       
   389     }
       
   390 
       
   391     private void debugCompleted(String tag, long startNanos, HttpRequest req) {
       
   392         if (debugelapsed.isLoggable(Level.DEBUG)) {
       
   393             debugelapsed.log(Level.DEBUG, () -> tag + " elapsed "
       
   394                     + (System.nanoTime() - startNanos)/1000_000L
       
   395                     + " millis for " + req.method()
       
   396                     + " to " + req.uri());
       
   397         }
       
   398     }
       
   399 
       
   400     @Override
       
   401     public <T> HttpResponse<T>
       
   402     send(HttpRequest req, BodyHandler<T> responseHandler)
       
   403         throws IOException, InterruptedException
       
   404     {
       
   405         try {
       
   406             return sendAsync(req, responseHandler).get();
       
   407         } catch (ExecutionException e) {
       
   408             Throwable t = e.getCause();
       
   409             if (t instanceof Error)
       
   410                 throw (Error)t;
       
   411             if (t instanceof RuntimeException)
       
   412                 throw (RuntimeException)t;
       
   413             else if (t instanceof IOException)
       
   414                 throw Utils.getIOException(t);
       
   415             else
       
   416                 throw new InternalError("Unexpected exception", t);
       
   417         }
       
   418     }
       
   419 
       
   420     @Override
       
   421     public <T> CompletableFuture<HttpResponse<T>>
       
   422     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
       
   423     {
       
   424         return sendAsync(userRequest, responseHandler, null);
       
   425     }
       
   426 
       
   427 
       
   428     @Override
       
   429     public <T> CompletableFuture<HttpResponse<T>>
       
   430     sendAsync(HttpRequest userRequest,
       
   431               BodyHandler<T> responseHandler,
       
   432               PushPromiseHandler<T> pushPromiseHandler)
       
   433     {
       
   434         AccessControlContext acc = null;
       
   435         if (System.getSecurityManager() != null)
       
   436             acc = AccessController.getContext();
       
   437 
       
   438         // Clone the, possibly untrusted, HttpRequest
       
   439         HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
       
   440         if (requestImpl.method().equals("CONNECT"))
       
   441             throw new IllegalArgumentException("Unsupported method CONNECT");
       
   442 
       
   443         long start = DEBUGELAPSED ? System.nanoTime() : 0;
       
   444         reference();
       
   445         try {
       
   446             debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
       
   447 
       
   448             MultiExchange<T> mex = new MultiExchange<>(userRequest,
       
   449                                                             requestImpl,
       
   450                                                             this,
       
   451                                                             responseHandler,
       
   452                                                             pushPromiseHandler,
       
   453                                                             acc);
       
   454             CompletableFuture<HttpResponse<T>> res =
       
   455                     mex.responseAsync().whenComplete((b,t) -> unreference());
       
   456             if (DEBUGELAPSED) {
       
   457                 res = res.whenComplete(
       
   458                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
       
   459             }
       
   460             // makes sure that any dependent actions happen in the executor
       
   461             if (acc != null) {
       
   462                 res.whenCompleteAsync((r, t) -> { /* do nothing */},
       
   463                                       new PrivilegedExecutor(executor, acc));
       
   464             }
       
   465 
       
   466             return res;
       
   467         } catch(Throwable t) {
       
   468             unreference();
       
   469             debugCompleted("ClientImpl (async)", start, userRequest);
       
   470             throw t;
       
   471         }
       
   472     }
       
   473 
       
   474     // Main loop for this client's selector
       
   475     private final static class SelectorManager extends Thread {
       
   476 
       
   477         // For testing purposes we have an internal System property that
       
   478         // can control the frequency at which the selector manager will wake
       
   479         // up when there are no pending operations.
       
   480         // Increasing the frequency (shorter delays) might allow the selector
       
   481         // to observe that the facade is no longer referenced and might allow
       
   482         // the selector thread to terminate more timely - for when nothing is
       
   483         // ongoing it will only check for that condition every NODEADLINE ms.
       
   484         // To avoid misuse of the property, the delay that can be specified
       
   485         // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
       
   486         // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
       
   487         // The property is -Djdk.httpclient.internal.selector.timeout=<millis>
       
   488         private static final int MIN_NODEADLINE = 1000; // ms
       
   489         private static final int MAX_NODEADLINE = 1000 * 1200; // ms
       
   490         private static final int DEF_NODEADLINE = 3000; // ms
       
   491         private static final long NODEADLINE; // default is DEF_NODEADLINE ms
       
   492         static {
       
   493             // ensure NODEADLINE is initialized with some valid value.
       
   494             long deadline =  Utils.getIntegerNetProperty(
       
   495                 "jdk.httpclient.internal.selector.timeout",
       
   496                 DEF_NODEADLINE); // millis
       
   497             if (deadline <= 0) deadline = DEF_NODEADLINE;
       
   498             deadline = Math.max(deadline, MIN_NODEADLINE);
       
   499             NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
       
   500         }
       
   501 
       
   502         private final Selector selector;
       
   503         private volatile boolean closed;
       
   504         private final List<AsyncEvent> registrations;
       
   505         private final System.Logger debug;
       
   506         private final System.Logger debugtimeout;
       
   507         HttpClientImpl owner;
       
   508         ConnectionPool pool;
       
   509 
       
   510         SelectorManager(HttpClientImpl ref) throws IOException {
       
   511             super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false);
       
   512             owner = ref;
       
   513             debug = ref.debug;
       
   514             debugtimeout = ref.debugtimeout;
       
   515             pool = ref.connectionPool();
       
   516             registrations = new ArrayList<>();
       
   517             selector = Selector.open();
       
   518         }
       
   519 
       
   520         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
       
   521             if (Thread.currentThread() == this) {
       
   522                 SelectionKey key = e.channel().keyFor(selector);
       
   523                 if (key != null) {
       
   524                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   525                     if (sa != null) sa.register(e);
       
   526                 }
       
   527             } else {
       
   528                 register(e);
       
   529             }
       
   530         }
       
   531 
       
   532         // This returns immediately. So caller not allowed to send/receive
       
   533         // on connection.
       
   534         synchronized void register(AsyncEvent e) {
       
   535             registrations.add(e);
       
   536             selector.wakeup();
       
   537         }
       
   538 
       
   539         synchronized void cancel(SocketChannel e) {
       
   540             SelectionKey key = e.keyFor(selector);
       
   541             if (key != null) {
       
   542                 key.cancel();
       
   543             }
       
   544             selector.wakeup();
       
   545         }
       
   546 
       
   547         void wakeupSelector() {
       
   548             selector.wakeup();
       
   549         }
       
   550 
       
   551         synchronized void shutdown() {
       
   552             debug.log(Level.DEBUG, "SelectorManager shutting down");
       
   553             closed = true;
       
   554             try {
       
   555                 selector.close();
       
   556             } catch (IOException ignored) {
       
   557             } finally {
       
   558                 owner.stop();
       
   559             }
       
   560         }
       
   561 
       
   562         @Override
       
   563         public void run() {
       
   564             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
       
   565             List<AsyncEvent> readyList = new ArrayList<>();
       
   566             try {
       
   567                 while (!Thread.currentThread().isInterrupted()) {
       
   568                     synchronized (this) {
       
   569                         assert errorList.isEmpty();
       
   570                         assert readyList.isEmpty();
       
   571                         for (AsyncEvent event : registrations) {
       
   572                             if (event instanceof AsyncTriggerEvent) {
       
   573                                 readyList.add(event);
       
   574                                 continue;
       
   575                             }
       
   576                             SelectableChannel chan = event.channel();
       
   577                             SelectionKey key = null;
       
   578                             try {
       
   579                                 key = chan.keyFor(selector);
       
   580                                 SelectorAttachment sa;
       
   581                                 if (key == null || !key.isValid()) {
       
   582                                     if (key != null) {
       
   583                                         // key is canceled.
       
   584                                         // invoke selectNow() to purge it
       
   585                                         // before registering the new event.
       
   586                                         selector.selectNow();
       
   587                                     }
       
   588                                     sa = new SelectorAttachment(chan, selector);
       
   589                                 } else {
       
   590                                     sa = (SelectorAttachment) key.attachment();
       
   591                                 }
       
   592                                 // may throw IOE if channel closed: that's OK
       
   593                                 sa.register(event);
       
   594                                 if (!chan.isOpen()) {
       
   595                                     throw new IOException("Channel closed");
       
   596                                 }
       
   597                             } catch (IOException e) {
       
   598                                 Log.logTrace("HttpClientImpl: " + e);
       
   599                                 debug.log(Level.DEBUG, () ->
       
   600                                         "Got " + e.getClass().getName()
       
   601                                                  + " while handling"
       
   602                                                  + " registration events");
       
   603                                 chan.close();
       
   604                                 // let the event abort deal with it
       
   605                                 errorList.add(new Pair<>(event, e));
       
   606                                 if (key != null) {
       
   607                                     key.cancel();
       
   608                                     selector.selectNow();
       
   609                                 }
       
   610                             }
       
   611                         }
       
   612                         registrations.clear();
       
   613                         selector.selectedKeys().clear();
       
   614                     }
       
   615 
       
   616                     for (AsyncEvent event : readyList) {
       
   617                         assert event instanceof AsyncTriggerEvent;
       
   618                         event.handle();
       
   619                     }
       
   620                     readyList.clear();
       
   621 
       
   622                     for (Pair<AsyncEvent,IOException> error : errorList) {
       
   623                         // an IOException was raised and the channel closed.
       
   624                         handleEvent(error.first, error.second);
       
   625                     }
       
   626                     errorList.clear();
       
   627 
       
   628                     // Check whether client is still alive, and if not,
       
   629                     // gracefully stop this thread
       
   630                     if (!owner.isReferenced()) {
       
   631                         Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   632                         return;
       
   633                     }
       
   634 
       
   635                     // Timeouts will have milliseconds granularity. It is important
       
   636                     // to handle them in a timely fashion.
       
   637                     long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
       
   638                     debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout);
       
   639 
       
   640                     // Keep-alive have seconds granularity. It's not really an
       
   641                     // issue if we keep connections linger a bit more in the keep
       
   642                     // alive cache.
       
   643                     long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
       
   644                     debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry);
       
   645 
       
   646                     assert nextTimeout >= 0;
       
   647                     assert nextExpiry >= 0;
       
   648 
       
   649                     // Don't wait for ever as it might prevent the thread to
       
   650                     // stop gracefully. millis will be 0 if no deadline was found.
       
   651                     if (nextTimeout <= 0) nextTimeout = NODEADLINE;
       
   652 
       
   653                     // Clip nextExpiry at NODEADLINE limit. The default
       
   654                     // keep alive is 1200 seconds (half an hour) - we don't
       
   655                     // want to wait that long.
       
   656                     if (nextExpiry <= 0) nextExpiry = NODEADLINE;
       
   657                     else nextExpiry = Math.min(NODEADLINE, nextExpiry);
       
   658 
       
   659                     // takes the least of the two.
       
   660                     long millis = Math.min(nextExpiry, nextTimeout);
       
   661 
       
   662                     debugtimeout.log(Level.DEBUG, "Next deadline is %d",
       
   663                                      (millis == 0 ? NODEADLINE : millis));
       
   664                     //debugPrint(selector);
       
   665                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
       
   666                     if (n == 0) {
       
   667                         // Check whether client is still alive, and if not,
       
   668                         // gracefully stop this thread
       
   669                         if (!owner.isReferenced()) {
       
   670                             Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   671                             return;
       
   672                         }
       
   673                         owner.purgeTimeoutsAndReturnNextDeadline();
       
   674                         continue;
       
   675                     }
       
   676                     Set<SelectionKey> keys = selector.selectedKeys();
       
   677 
       
   678                     assert errorList.isEmpty();
       
   679                     for (SelectionKey key : keys) {
       
   680                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   681                         if (!key.isValid()) {
       
   682                             IOException ex = sa.chan.isOpen()
       
   683                                     ? new IOException("Invalid key")
       
   684                                     : new ClosedChannelException();
       
   685                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
       
   686                             sa.pending.clear();
       
   687                             continue;
       
   688                         }
       
   689 
       
   690                         int eventsOccurred;
       
   691                         try {
       
   692                             eventsOccurred = key.readyOps();
       
   693                         } catch (CancelledKeyException ex) {
       
   694                             IOException io = Utils.getIOException(ex);
       
   695                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
       
   696                             sa.pending.clear();
       
   697                             continue;
       
   698                         }
       
   699                         sa.events(eventsOccurred).forEach(readyList::add);
       
   700                         sa.resetInterestOps(eventsOccurred);
       
   701                     }
       
   702                     selector.selectNow(); // complete cancellation
       
   703                     selector.selectedKeys().clear();
       
   704 
       
   705                     for (AsyncEvent event : readyList) {
       
   706                         handleEvent(event, null); // will be delegated to executor
       
   707                     }
       
   708                     readyList.clear();
       
   709                     errorList.forEach((p) -> handleEvent(p.first, p.second));
       
   710                     errorList.clear();
       
   711                 }
       
   712             } catch (Throwable e) {
       
   713                 //e.printStackTrace();
       
   714                 if (!closed) {
       
   715                     // This terminates thread. So, better just print stack trace
       
   716                     String err = Utils.stackTrace(e);
       
   717                     Log.logError("HttpClientImpl: fatal error: " + err);
       
   718                 }
       
   719                 debug.log(Level.DEBUG, "shutting down", e);
       
   720                 if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) {
       
   721                     e.printStackTrace(System.err); // always print the stack
       
   722                 }
       
   723             } finally {
       
   724                 shutdown();
       
   725             }
       
   726         }
       
   727 
       
   728 //        void debugPrint(Selector selector) {
       
   729 //            System.err.println("Selector: debugprint start");
       
   730 //            Set<SelectionKey> keys = selector.keys();
       
   731 //            for (SelectionKey key : keys) {
       
   732 //                SelectableChannel c = key.channel();
       
   733 //                int ops = key.interestOps();
       
   734 //                System.err.printf("selector chan:%s ops:%d\n", c, ops);
       
   735 //            }
       
   736 //            System.err.println("Selector: debugprint end");
       
   737 //        }
       
   738 
       
   739         /** Handles the given event. The given ioe may be null. */
       
   740         void handleEvent(AsyncEvent event, IOException ioe) {
       
   741             if (closed || ioe != null) {
       
   742                 event.abort(ioe);
       
   743             } else {
       
   744                 event.handle();
       
   745             }
       
   746         }
       
   747     }
       
   748 
       
   749     /**
       
   750      * Tracks multiple user level registrations associated with one NIO
       
   751      * registration (SelectionKey). In this implementation, registrations
       
   752      * are one-off and when an event is posted the registration is cancelled
       
   753      * until explicitly registered again.
       
   754      *
       
   755      * <p> No external synchronization required as this class is only used
       
   756      * by the SelectorManager thread. One of these objects required per
       
   757      * connection.
       
   758      */
       
   759     private static class SelectorAttachment {
       
   760         private final SelectableChannel chan;
       
   761         private final Selector selector;
       
   762         private final Set<AsyncEvent> pending;
       
   763         private final static System.Logger debug =
       
   764                 Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG);
       
   765         private int interestOps;
       
   766 
       
   767         SelectorAttachment(SelectableChannel chan, Selector selector) {
       
   768             this.pending = new HashSet<>();
       
   769             this.chan = chan;
       
   770             this.selector = selector;
       
   771         }
       
   772 
       
   773         void register(AsyncEvent e) throws ClosedChannelException {
       
   774             int newOps = e.interestOps();
       
   775             boolean reRegister = (interestOps & newOps) != newOps;
       
   776             interestOps |= newOps;
       
   777             pending.add(e);
       
   778             if (reRegister) {
       
   779                 // first time registration happens here also
       
   780                 try {
       
   781                     chan.register(selector, interestOps, this);
       
   782                 } catch (CancelledKeyException x) {
       
   783                     abortPending(x);
       
   784                 }
       
   785             }
       
   786         }
       
   787 
       
   788         /**
       
   789          * Returns a Stream<AsyncEvents> containing only events that are
       
   790          * registered with the given {@code interestOps}.
       
   791          */
       
   792         Stream<AsyncEvent> events(int interestOps) {
       
   793             return pending.stream()
       
   794                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
       
   795         }
       
   796 
       
   797         /**
       
   798          * Removes any events with the given {@code interestOps}, and if no
       
   799          * events remaining, cancels the associated SelectionKey.
       
   800          */
       
   801         void resetInterestOps(int interestOps) {
       
   802             int newOps = 0;
       
   803 
       
   804             Iterator<AsyncEvent> itr = pending.iterator();
       
   805             while (itr.hasNext()) {
       
   806                 AsyncEvent event = itr.next();
       
   807                 int evops = event.interestOps();
       
   808                 if (event.repeating()) {
       
   809                     newOps |= evops;
       
   810                     continue;
       
   811                 }
       
   812                 if ((evops & interestOps) != 0) {
       
   813                     itr.remove();
       
   814                 } else {
       
   815                     newOps |= evops;
       
   816                 }
       
   817             }
       
   818 
       
   819             this.interestOps = newOps;
       
   820             SelectionKey key = chan.keyFor(selector);
       
   821             if (newOps == 0 && pending.isEmpty()) {
       
   822                 key.cancel();
       
   823             } else {
       
   824                 try {
       
   825                     key.interestOps(newOps);
       
   826                 } catch (CancelledKeyException x) {
       
   827                     // channel may have been closed
       
   828                     debug.log(Level.DEBUG, "key cancelled for " + chan);
       
   829                     abortPending(x);
       
   830                 }
       
   831             }
       
   832         }
       
   833 
       
   834         void abortPending(Throwable x) {
       
   835             if (!pending.isEmpty()) {
       
   836                 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
       
   837                 pending.clear();
       
   838                 IOException io = Utils.getIOException(x);
       
   839                 for (AsyncEvent event : evts) {
       
   840                     event.abort(io);
       
   841                 }
       
   842             }
       
   843         }
       
   844     }
       
   845 
       
   846     /*package-private*/ SSLContext theSSLContext() {
       
   847         return sslContext;
       
   848     }
       
   849 
       
   850     @Override
       
   851     public SSLContext sslContext() {
       
   852         return sslContext;
       
   853     }
       
   854 
       
   855     @Override
       
   856     public SSLParameters sslParameters() {
       
   857         return Utils.copySSLParameters(sslParams);
       
   858     }
       
   859 
       
   860     @Override
       
   861     public Optional<Authenticator> authenticator() {
       
   862         return Optional.ofNullable(authenticator);
       
   863     }
       
   864 
       
   865     /*package-private*/ final Executor theExecutor() {
       
   866         return executor;
       
   867     }
       
   868 
       
   869     @Override
       
   870     public final Optional<Executor> executor() {
       
   871         return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
       
   872     }
       
   873 
       
   874     ConnectionPool connectionPool() {
       
   875         return connections;
       
   876     }
       
   877 
       
   878     @Override
       
   879     public Redirect followRedirects() {
       
   880         return followRedirects;
       
   881     }
       
   882 
       
   883 
       
   884     @Override
       
   885     public Optional<CookieHandler> cookieHandler() {
       
   886         return Optional.ofNullable(cookieHandler);
       
   887     }
       
   888 
       
   889     @Override
       
   890     public Optional<ProxySelector> proxy() {
       
   891         return this.userProxySelector;
       
   892     }
       
   893 
       
   894     // Return the effective proxy that this client uses.
       
   895     ProxySelector proxySelector() {
       
   896         return proxySelector;
       
   897     }
       
   898 
       
   899     @Override
       
   900     public WebSocket.Builder newWebSocketBuilder() {
       
   901         // Make sure to pass the HttpClientFacade to the WebSocket builder.
       
   902         // This will ensure that the facade is not released before the
       
   903         // WebSocket has been created, at which point the pendingOperationCount
       
   904         // will have been incremented by the DetachedConnectionChannel
       
   905         // (see PlainHttpConnection.detachChannel())
       
   906         return new BuilderImpl(this.facade(), proxySelector);
       
   907     }
       
   908 
       
   909     @Override
       
   910     public Version version() {
       
   911         return version;
       
   912     }
       
   913 
       
   914     String dbgString() {
       
   915         return dbgTag;
       
   916     }
       
   917 
       
   918     @Override
       
   919     public String toString() {
       
   920         // Used by tests to get the client's id and compute the
       
   921         // name of the SelectorManager thread.
       
   922         return super.toString() + ("(" + id + ")");
       
   923     }
       
   924 
       
   925     private void initFilters() {
       
   926         addFilter(AuthenticationFilter.class);
       
   927         addFilter(RedirectFilter.class);
       
   928         if (this.cookieHandler != null) {
       
   929             addFilter(CookieFilter.class);
       
   930         }
       
   931     }
       
   932 
       
   933     private void addFilter(Class<? extends HeaderFilter> f) {
       
   934         filters.addFilter(f);
       
   935     }
       
   936 
       
   937     final List<HeaderFilter> filterChain() {
       
   938         return filters.getFilterChain();
       
   939     }
       
   940 
       
   941     // Timer controls.
       
   942     // Timers are implemented through timed Selector.select() calls.
       
   943 
       
   944     synchronized void registerTimer(TimeoutEvent event) {
       
   945         Log.logTrace("Registering timer {0}", event);
       
   946         timeouts.add(event);
       
   947         selmgr.wakeupSelector();
       
   948     }
       
   949 
       
   950     synchronized void cancelTimer(TimeoutEvent event) {
       
   951         Log.logTrace("Canceling timer {0}", event);
       
   952         timeouts.remove(event);
       
   953     }
       
   954 
       
   955     /**
       
   956      * Purges ( handles ) timer events that have passed their deadline, and
       
   957      * returns the amount of time, in milliseconds, until the next earliest
       
   958      * event. A return value of 0 means that there are no events.
       
   959      */
       
   960     private long purgeTimeoutsAndReturnNextDeadline() {
       
   961         long diff = 0L;
       
   962         List<TimeoutEvent> toHandle = null;
       
   963         int remaining = 0;
       
   964         // enter critical section to retrieve the timeout event to handle
       
   965         synchronized(this) {
       
   966             if (timeouts.isEmpty()) return 0L;
       
   967 
       
   968             Instant now = Instant.now();
       
   969             Iterator<TimeoutEvent> itr = timeouts.iterator();
       
   970             while (itr.hasNext()) {
       
   971                 TimeoutEvent event = itr.next();
       
   972                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
       
   973                 if (diff <= 0) {
       
   974                     itr.remove();
       
   975                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
       
   976                     toHandle.add(event);
       
   977                 } else {
       
   978                     break;
       
   979                 }
       
   980             }
       
   981             remaining = timeouts.size();
       
   982         }
       
   983 
       
   984         // can be useful for debugging
       
   985         if (toHandle != null && Log.trace()) {
       
   986             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
       
   987                     +  toHandle.size() + " events, "
       
   988                     + "remaining " + remaining
       
   989                     + ", next deadline: " + (diff < 0 ? 0L : diff));
       
   990         }
       
   991 
       
   992         // handle timeout events out of critical section
       
   993         if (toHandle != null) {
       
   994             Throwable failed = null;
       
   995             for (TimeoutEvent event : toHandle) {
       
   996                 try {
       
   997                    Log.logTrace("Firing timer {0}", event);
       
   998                    event.handle();
       
   999                 } catch (Error | RuntimeException e) {
       
  1000                     // Not expected. Handle remaining events then throw...
       
  1001                     // If e is an OOME or SOE it might simply trigger a new
       
  1002                     // error from here - but in this case there's not much we
       
  1003                     // could do anyway. Just let it flow...
       
  1004                     if (failed == null) failed = e;
       
  1005                     else failed.addSuppressed(e);
       
  1006                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
       
  1007                 }
       
  1008             }
       
  1009             if (failed instanceof Error) throw (Error) failed;
       
  1010             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
       
  1011         }
       
  1012 
       
  1013         // return time to wait until next event. 0L if there's no more events.
       
  1014         return diff < 0 ? 0L : diff;
       
  1015     }
       
  1016 
       
  1017     // used for the connection window
       
  1018     int getReceiveBufferSize() {
       
  1019         return Utils.getIntegerNetProperty(
       
  1020                 "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024
       
  1021         );
       
  1022     }
       
  1023 }