src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56078 6c11b48a0695
child 56080 64846522c0d5
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
     1 /*
       
     2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http;
       
    27 
       
    28 import javax.net.ssl.SSLContext;
       
    29 import javax.net.ssl.SSLParameters;
       
    30 import java.io.IOException;
       
    31 import java.lang.System.Logger.Level;
       
    32 import java.lang.ref.WeakReference;
       
    33 import java.net.Authenticator;
       
    34 import java.net.CookieHandler;
       
    35 import java.net.ProxySelector;
       
    36 import java.nio.channels.CancelledKeyException;
       
    37 import java.nio.channels.ClosedChannelException;
       
    38 import java.nio.channels.SelectableChannel;
       
    39 import java.nio.channels.SelectionKey;
       
    40 import java.nio.channels.Selector;
       
    41 import java.nio.channels.SocketChannel;
       
    42 import java.security.AccessControlContext;
       
    43 import java.security.AccessController;
       
    44 import java.security.NoSuchAlgorithmException;
       
    45 import java.security.PrivilegedAction;
       
    46 import java.time.Instant;
       
    47 import java.time.temporal.ChronoUnit;
       
    48 import java.util.ArrayList;
       
    49 import java.util.HashSet;
       
    50 import java.util.Iterator;
       
    51 import java.util.List;
       
    52 import java.util.Optional;
       
    53 import java.util.Set;
       
    54 import java.util.TreeSet;
       
    55 import java.util.concurrent.CompletableFuture;
       
    56 import java.util.concurrent.ExecutionException;
       
    57 import java.util.concurrent.Executor;
       
    58 import java.util.concurrent.Executors;
       
    59 import java.util.concurrent.ThreadFactory;
       
    60 import java.util.concurrent.atomic.AtomicInteger;
       
    61 import java.util.concurrent.atomic.AtomicLong;
       
    62 import java.util.stream.Stream;
       
    63 import jdk.incubator.http.HttpResponse.BodyHandler;
       
    64 import jdk.incubator.http.HttpResponse.PushPromiseHandler;
       
    65 import jdk.incubator.http.internal.common.Log;
       
    66 import jdk.incubator.http.internal.common.Pair;
       
    67 import jdk.incubator.http.internal.common.Utils;
       
    68 import jdk.incubator.http.internal.websocket.BuilderImpl;
       
    69 import jdk.internal.misc.InnocuousThread;
       
    70 
       
    71 /**
       
    72  * Client implementation. Contains all configuration information and also
       
    73  * the selector manager thread which allows async events to be registered
       
    74  * and delivered when they occur. See AsyncEvent.
       
    75  */
       
    76 class HttpClientImpl extends HttpClient {
       
    77 
       
    78     static final boolean DEBUG = Utils.DEBUG;  // Revisit: temporary dev flag.
       
    79     static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG;  // Revisit: temporary dev flag.
       
    80     static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag.
       
    81     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    82     final System.Logger  debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
       
    83     final System.Logger  debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
       
    84     static final AtomicLong CLIENT_IDS = new AtomicLong();
       
    85 
       
    86     // Define the default factory as a static inner class
       
    87     // that embeds all the necessary logic to avoid
       
    88     // the risk of using a lambda that might keep a reference on the
       
    89     // HttpClient instance from which it was created (helps with
       
    90     // heapdump analysis).
       
    91     private static final class DefaultThreadFactory implements ThreadFactory {
       
    92         private final String namePrefix;
       
    93         private final AtomicInteger nextId = new AtomicInteger();
       
    94 
       
    95         DefaultThreadFactory(long clientID) {
       
    96             namePrefix = "HttpClient-" + clientID + "-Worker-";
       
    97         }
       
    98 
       
    99         @Override
       
   100         public Thread newThread(Runnable r) {
       
   101             String name = namePrefix + nextId.getAndIncrement();
       
   102             Thread t;
       
   103             if (System.getSecurityManager() == null) {
       
   104                 t = new Thread(null, r, name, 0, false);
       
   105             } else {
       
   106                 t = InnocuousThread.newThread(name, r);
       
   107             }
       
   108             t.setDaemon(true);
       
   109             return t;
       
   110         }
       
   111     }
       
   112 
       
   113     private final CookieHandler cookieHandler;
       
   114     private final Redirect followRedirects;
       
   115     private final Optional<ProxySelector> userProxySelector;
       
   116     private final ProxySelector proxySelector;
       
   117     private final Authenticator authenticator;
       
   118     private final Version version;
       
   119     private final ConnectionPool connections;
       
   120     private final Executor executor;
       
   121     private final boolean isDefaultExecutor;
       
   122     // Security parameters
       
   123     private final SSLContext sslContext;
       
   124     private final SSLParameters sslParams;
       
   125     private final SelectorManager selmgr;
       
   126     private final FilterFactory filters;
       
   127     private final Http2ClientImpl client2;
       
   128     private final long id;
       
   129     private final String dbgTag;
       
   130 
       
   131     // This reference is used to keep track of the facade HttpClient
       
   132     // that was returned to the application code.
       
   133     // It makes it possible to know when the application no longer
       
   134     // holds any reference to the HttpClient.
       
   135     // Unfortunately, this information is not enough to know when
       
   136     // to exit the SelectorManager thread. Because of the asynchronous
       
   137     // nature of the API, we also need to wait until all pending operations
       
   138     // have completed.
       
   139     private final WeakReference<HttpClientFacade> facadeRef;
       
   140 
       
   141     // This counter keeps track of the number of operations pending
       
   142     // on the HttpClient. The SelectorManager thread will wait
       
   143     // until there are no longer any pending operations and the
       
   144     // facadeRef is cleared before exiting.
       
   145     //
       
   146     // The pendingOperationCount is incremented every time a send/sendAsync
       
   147     // operation is invoked on the HttpClient, and is decremented when
       
   148     // the HttpResponse<T> object is returned to the user.
       
   149     // However, at this point, the body may not have been fully read yet.
       
   150     // This is the case when the response T is implemented as a streaming
       
   151     // subscriber (such as an InputStream).
       
   152     //
       
   153     // To take care of this issue the pendingOperationCount will additionally
       
   154     // be incremented/decremented in the following cases:
       
   155     //
       
   156     // 1. For HTTP/2  it is incremented when a stream is added to the
       
   157     //    Http2Connection streams map, and decreased when the stream is removed
       
   158     //    from the map. This should also take care of push promises.
       
   159     // 2. For WebSocket the count is increased when creating a
       
   160     //    DetachedConnectionChannel for the socket, and decreased
       
   161     //    when the the channel is closed.
       
   162     //    In addition, the HttpClient facade is passed to the WebSocket builder,
       
   163     //    (instead of the client implementation delegate).
       
   164     // 3. For HTTP/1.1 the count is incremented before starting to parse the body
       
   165     //    response, and decremented when the parser has reached the end of the
       
   166     //    response body flow.
       
   167     //
       
   168     // This should ensure that the selector manager thread remains alive until
       
   169     // the response has been fully received or the web socket is closed.
       
   170     private final AtomicLong pendingOperationCount = new AtomicLong();
       
   171     private final AtomicLong pendingWebSocketCount = new AtomicLong();
       
   172     private final AtomicLong pendingHttpRequestCount = new AtomicLong();
       
   173 
       
   174     /** A Set of, deadline first, ordered timeout events. */
       
   175     private final TreeSet<TimeoutEvent> timeouts;
       
   176 
       
   177     /**
       
   178      * This is a bit tricky:
       
   179      * 1. an HttpClientFacade has a final HttpClientImpl field.
       
   180      * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
       
   181      *    where the referent is the facade created for that instance.
       
   182      * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
       
   183      *    constructor, because it would be only weakly referenced and could
       
   184      *    be GC'ed before we can return it.
       
   185      * The solution is to use an instance of SingleFacadeFactory which will
       
   186      * allow the caller of new HttpClientImpl(...) to retrieve the facade
       
   187      * after the HttpClientImpl has been created.
       
   188      */
       
   189     private static final class SingleFacadeFactory {
       
   190         HttpClientFacade facade;
       
   191         HttpClientFacade createFacade(HttpClientImpl impl) {
       
   192             assert facade == null;
       
   193             return (facade = new HttpClientFacade(impl));
       
   194         }
       
   195     }
       
   196 
       
   197     static HttpClientFacade create(HttpClientBuilderImpl builder) {
       
   198         SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
       
   199         HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
       
   200         impl.start();
       
   201         assert facadeFactory.facade != null;
       
   202         assert impl.facadeRef.get() == facadeFactory.facade;
       
   203         return facadeFactory.facade;
       
   204     }
       
   205 
       
   206     private HttpClientImpl(HttpClientBuilderImpl builder,
       
   207                            SingleFacadeFactory facadeFactory) {
       
   208         id = CLIENT_IDS.incrementAndGet();
       
   209         dbgTag = "HttpClientImpl(" + id +")";
       
   210         if (builder.sslContext == null) {
       
   211             try {
       
   212                 sslContext = SSLContext.getDefault();
       
   213             } catch (NoSuchAlgorithmException ex) {
       
   214                 throw new InternalError(ex);
       
   215             }
       
   216         } else {
       
   217             sslContext = builder.sslContext;
       
   218         }
       
   219         Executor ex = builder.executor;
       
   220         if (ex == null) {
       
   221             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
       
   222             isDefaultExecutor = true;
       
   223         } else {
       
   224             ex = builder.executor;
       
   225             isDefaultExecutor = false;
       
   226         }
       
   227         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
       
   228         client2 = new Http2ClientImpl(this);
       
   229         executor = ex;
       
   230         cookieHandler = builder.cookieHandler;
       
   231         followRedirects = builder.followRedirects == null ?
       
   232                 Redirect.NEVER : builder.followRedirects;
       
   233         this.userProxySelector = Optional.ofNullable(builder.proxy);
       
   234         this.proxySelector = userProxySelector
       
   235                 .orElseGet(HttpClientImpl::getDefaultProxySelector);
       
   236         debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)",
       
   237                 this.proxySelector, userProxySelector.isPresent());
       
   238         authenticator = builder.authenticator;
       
   239         if (builder.version == null) {
       
   240             version = HttpClient.Version.HTTP_2;
       
   241         } else {
       
   242             version = builder.version;
       
   243         }
       
   244         if (builder.sslParams == null) {
       
   245             sslParams = getDefaultParams(sslContext);
       
   246         } else {
       
   247             sslParams = builder.sslParams;
       
   248         }
       
   249         connections = new ConnectionPool(id);
       
   250         connections.start();
       
   251         timeouts = new TreeSet<>();
       
   252         try {
       
   253             selmgr = new SelectorManager(this);
       
   254         } catch (IOException e) {
       
   255             // unlikely
       
   256             throw new InternalError(e);
       
   257         }
       
   258         selmgr.setDaemon(true);
       
   259         filters = new FilterFactory();
       
   260         initFilters();
       
   261         assert facadeRef.get() != null;
       
   262     }
       
   263 
       
   264     private void start() {
       
   265         selmgr.start();
       
   266     }
       
   267 
       
   268     // Called from the SelectorManager thread, just before exiting.
       
   269     // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
       
   270     // that may be still lingering there are properly closed (and their
       
   271     // possibly still opened SocketChannel released).
       
   272     private void stop() {
       
   273         // Clears HTTP/1.1 cache and close its connections
       
   274         connections.stop();
       
   275         // Clears HTTP/2 cache and close its connections.
       
   276         client2.stop();
       
   277     }
       
   278 
       
   279     private static SSLParameters getDefaultParams(SSLContext ctx) {
       
   280         SSLParameters params = ctx.getSupportedSSLParameters();
       
   281         params.setProtocols(new String[]{"TLSv1.2"});
       
   282         return params;
       
   283     }
       
   284 
       
   285     private static ProxySelector getDefaultProxySelector() {
       
   286         PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
       
   287         return AccessController.doPrivileged(action);
       
   288     }
       
   289 
       
   290     // Returns the facade that was returned to the application code.
       
   291     // May be null if that facade is no longer referenced.
       
   292     final HttpClientFacade facade() {
       
   293         return facadeRef.get();
       
   294     }
       
   295 
       
   296     // Increments the pendingOperationCount.
       
   297     final long reference() {
       
   298         pendingHttpRequestCount.incrementAndGet();
       
   299         return pendingOperationCount.incrementAndGet();
       
   300     }
       
   301 
       
   302     // Decrements the pendingOperationCount.
       
   303     final long unreference() {
       
   304         final long count = pendingOperationCount.decrementAndGet();
       
   305         final long httpCount = pendingHttpRequestCount.decrementAndGet();
       
   306         final long webSocketCount = pendingWebSocketCount.get();
       
   307         if (count == 0 && facade() == null) {
       
   308             selmgr.wakeupSelector();
       
   309         }
       
   310         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   311         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   312         assert count >= 0 : "count of pending operations < 0";
       
   313         return count;
       
   314     }
       
   315 
       
   316     // Increments the pendingOperationCount.
       
   317     final long webSocketOpen() {
       
   318         pendingWebSocketCount.incrementAndGet();
       
   319         return pendingOperationCount.incrementAndGet();
       
   320     }
       
   321 
       
   322     // Decrements the pendingOperationCount.
       
   323     final long webSocketClose() {
       
   324         final long count = pendingOperationCount.decrementAndGet();
       
   325         final long webSocketCount = pendingWebSocketCount.decrementAndGet();
       
   326         final long httpCount = pendingHttpRequestCount.get();
       
   327         if (count == 0 && facade() == null) {
       
   328             selmgr.wakeupSelector();
       
   329         }
       
   330         assert httpCount >= 0 : "count of HTTP operations < 0";
       
   331         assert webSocketCount >= 0 : "count of WS operations < 0";
       
   332         assert count >= 0 : "count of pending operations < 0";
       
   333         return count;
       
   334     }
       
   335 
       
   336     // Returns the pendingOperationCount.
       
   337     final long referenceCount() {
       
   338         return pendingOperationCount.get();
       
   339     }
       
   340 
       
   341     // Called by the SelectorManager thread to figure out whether it's time
       
   342     // to terminate.
       
   343     final boolean isReferenced() {
       
   344         HttpClient facade = facade();
       
   345         return facade != null || referenceCount() > 0;
       
   346     }
       
   347 
       
   348     /**
       
   349      * Wait for activity on given exchange.
       
   350      * The following occurs in the SelectorManager thread.
       
   351      *
       
   352      *  1) add to selector
       
   353      *  2) If selector fires for this exchange then
       
   354      *     call AsyncEvent.handle()
       
   355      *
       
   356      * If exchange needs to change interest ops, then call registerEvent() again.
       
   357      */
       
   358     void registerEvent(AsyncEvent exchange) throws IOException {
       
   359         selmgr.register(exchange);
       
   360     }
       
   361 
       
   362     /**
       
   363      * Only used from RawChannel to disconnect the channel from
       
   364      * the selector
       
   365      */
       
   366     void cancelRegistration(SocketChannel s) {
       
   367         selmgr.cancel(s);
       
   368     }
       
   369 
       
   370     /**
       
   371      * Allows an AsyncEvent to modify its interestOps.
       
   372      * @param event The modified event.
       
   373      */
       
   374     void eventUpdated(AsyncEvent event) throws ClosedChannelException {
       
   375         assert !(event instanceof AsyncTriggerEvent);
       
   376         selmgr.eventUpdated(event);
       
   377     }
       
   378 
       
   379     boolean isSelectorThread() {
       
   380         return Thread.currentThread() == selmgr;
       
   381     }
       
   382 
       
   383     Http2ClientImpl client2() {
       
   384         return client2;
       
   385     }
       
   386 
       
   387     private void debugCompleted(String tag, long startNanos, HttpRequest req) {
       
   388         if (debugelapsed.isLoggable(Level.DEBUG)) {
       
   389             debugelapsed.log(Level.DEBUG, () -> tag + " elapsed "
       
   390                     + (System.nanoTime() - startNanos)/1000_000L
       
   391                     + " millis for " + req.method()
       
   392                     + " to " + req.uri());
       
   393         }
       
   394     }
       
   395 
       
   396     @Override
       
   397     public <T> HttpResponse<T>
       
   398     send(HttpRequest req, BodyHandler<T> responseHandler)
       
   399         throws IOException, InterruptedException
       
   400     {
       
   401         try {
       
   402             return sendAsync(req, responseHandler).get();
       
   403         } catch (ExecutionException e) {
       
   404             Throwable t = e.getCause();
       
   405             if (t instanceof Error)
       
   406                 throw (Error)t;
       
   407             if (t instanceof RuntimeException)
       
   408                 throw (RuntimeException)t;
       
   409             else if (t instanceof IOException)
       
   410                 throw Utils.getIOException(t);
       
   411             else
       
   412                 throw new InternalError("Unexpected exception", t);
       
   413         }
       
   414     }
       
   415 
       
   416     @Override
       
   417     public <T> CompletableFuture<HttpResponse<T>>
       
   418     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
       
   419     {
       
   420         return sendAsync(userRequest, responseHandler, null);
       
   421     }
       
   422 
       
   423 
       
   424     @Override
       
   425     public <T> CompletableFuture<HttpResponse<T>>
       
   426     sendAsync(HttpRequest userRequest,
       
   427               BodyHandler<T> responseHandler,
       
   428               PushPromiseHandler<T> pushPromiseHandler)
       
   429     {
       
   430         AccessControlContext acc = null;
       
   431         if (System.getSecurityManager() != null)
       
   432             acc = AccessController.getContext();
       
   433 
       
   434         // Clone the, possibly untrusted, HttpRequest
       
   435         HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
       
   436         if (requestImpl.method().equals("CONNECT"))
       
   437             throw new IllegalArgumentException("Unsupported method CONNECT");
       
   438 
       
   439         long start = DEBUGELAPSED ? System.nanoTime() : 0;
       
   440         reference();
       
   441         try {
       
   442             debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
       
   443 
       
   444             MultiExchange<T> mex = new MultiExchange<>(userRequest,
       
   445                                                             requestImpl,
       
   446                                                             this,
       
   447                                                             responseHandler,
       
   448                                                             pushPromiseHandler,
       
   449                                                             acc);
       
   450             CompletableFuture<HttpResponse<T>> res =
       
   451                     mex.responseAsync().whenComplete((b,t) -> unreference());
       
   452             if (DEBUGELAPSED) {
       
   453                 res = res.whenComplete(
       
   454                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
       
   455             }
       
   456             // makes sure that any dependent actions happen in the executor
       
   457             if (acc != null) {
       
   458                 res.whenCompleteAsync((r, t) -> { /* do nothing */},
       
   459                                       new PrivilegedExecutor(executor, acc));
       
   460             }
       
   461 
       
   462             return res;
       
   463         } catch(Throwable t) {
       
   464             unreference();
       
   465             debugCompleted("ClientImpl (async)", start, userRequest);
       
   466             throw t;
       
   467         }
       
   468     }
       
   469 
       
   470     // Main loop for this client's selector
       
   471     private final static class SelectorManager extends Thread {
       
   472 
       
   473         // For testing purposes we have an internal System property that
       
   474         // can control the frequency at which the selector manager will wake
       
   475         // up when there are no pending operations.
       
   476         // Increasing the frequency (shorter delays) might allow the selector
       
   477         // to observe that the facade is no longer referenced and might allow
       
   478         // the selector thread to terminate more timely - for when nothing is
       
   479         // ongoing it will only check for that condition every NODEADLINE ms.
       
   480         // To avoid misuse of the property, the delay that can be specified
       
   481         // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
       
   482         // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
       
   483         // The property is -Djdk.httpclient.internal.selector.timeout=<millis>
       
   484         private static final int MIN_NODEADLINE = 1000; // ms
       
   485         private static final int MAX_NODEADLINE = 1000 * 1200; // ms
       
   486         private static final int DEF_NODEADLINE = 3000; // ms
       
   487         private static final long NODEADLINE; // default is DEF_NODEADLINE ms
       
   488         static {
       
   489             // ensure NODEADLINE is initialized with some valid value.
       
   490             long deadline =  Utils.getIntegerNetProperty(
       
   491                 "jdk.httpclient.internal.selector.timeout",
       
   492                 DEF_NODEADLINE); // millis
       
   493             if (deadline <= 0) deadline = DEF_NODEADLINE;
       
   494             deadline = Math.max(deadline, MIN_NODEADLINE);
       
   495             NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
       
   496         }
       
   497 
       
   498         private final Selector selector;
       
   499         private volatile boolean closed;
       
   500         private final List<AsyncEvent> registrations;
       
   501         private final System.Logger debug;
       
   502         private final System.Logger debugtimeout;
       
   503         HttpClientImpl owner;
       
   504         ConnectionPool pool;
       
   505 
       
   506         SelectorManager(HttpClientImpl ref) throws IOException {
       
   507             super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false);
       
   508             owner = ref;
       
   509             debug = ref.debug;
       
   510             debugtimeout = ref.debugtimeout;
       
   511             pool = ref.connectionPool();
       
   512             registrations = new ArrayList<>();
       
   513             selector = Selector.open();
       
   514         }
       
   515 
       
   516         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
       
   517             if (Thread.currentThread() == this) {
       
   518                 SelectionKey key = e.channel().keyFor(selector);
       
   519                 SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   520                 if (sa != null) sa.register(e);
       
   521             } else {
       
   522                 register(e);
       
   523             }
       
   524         }
       
   525 
       
   526         // This returns immediately. So caller not allowed to send/receive
       
   527         // on connection.
       
   528         synchronized void register(AsyncEvent e) {
       
   529             registrations.add(e);
       
   530             selector.wakeup();
       
   531         }
       
   532 
       
   533         synchronized void cancel(SocketChannel e) {
       
   534             SelectionKey key = e.keyFor(selector);
       
   535             if (key != null) {
       
   536                 key.cancel();
       
   537             }
       
   538             selector.wakeup();
       
   539         }
       
   540 
       
   541         void wakeupSelector() {
       
   542             selector.wakeup();
       
   543         }
       
   544 
       
   545         synchronized void shutdown() {
       
   546             debug.log(Level.DEBUG, "SelectorManager shutting down");
       
   547             closed = true;
       
   548             try {
       
   549                 selector.close();
       
   550             } catch (IOException ignored) {
       
   551             } finally {
       
   552                 owner.stop();
       
   553             }
       
   554         }
       
   555 
       
   556         @Override
       
   557         public void run() {
       
   558             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
       
   559             List<AsyncEvent> readyList = new ArrayList<>();
       
   560             try {
       
   561                 while (!Thread.currentThread().isInterrupted()) {
       
   562                     synchronized (this) {
       
   563                         assert errorList.isEmpty();
       
   564                         assert readyList.isEmpty();
       
   565                         for (AsyncEvent event : registrations) {
       
   566                             if (event instanceof AsyncTriggerEvent) {
       
   567                                 readyList.add(event);
       
   568                                 continue;
       
   569                             }
       
   570                             SelectableChannel chan = event.channel();
       
   571                             SelectionKey key = null;
       
   572                             try {
       
   573                                 key = chan.keyFor(selector);
       
   574                                 SelectorAttachment sa;
       
   575                                 if (key == null || !key.isValid()) {
       
   576                                     if (key != null) {
       
   577                                         // key is canceled.
       
   578                                         // invoke selectNow() to purge it
       
   579                                         // before registering the new event.
       
   580                                         selector.selectNow();
       
   581                                     }
       
   582                                     sa = new SelectorAttachment(chan, selector);
       
   583                                 } else {
       
   584                                     sa = (SelectorAttachment) key.attachment();
       
   585                                 }
       
   586                                 // may throw IOE if channel closed: that's OK
       
   587                                 sa.register(event);
       
   588                                 if (!chan.isOpen()) {
       
   589                                     throw new IOException("Channel closed");
       
   590                                 }
       
   591                             } catch (IOException e) {
       
   592                                 Log.logTrace("HttpClientImpl: " + e);
       
   593                                 debug.log(Level.DEBUG, () ->
       
   594                                         "Got " + e.getClass().getName()
       
   595                                                  + " while handling"
       
   596                                                  + " registration events");
       
   597                                 chan.close();
       
   598                                 // let the event abort deal with it
       
   599                                 errorList.add(new Pair<>(event, e));
       
   600                                 if (key != null) {
       
   601                                     key.cancel();
       
   602                                     selector.selectNow();
       
   603                                 }
       
   604                             }
       
   605                         }
       
   606                         registrations.clear();
       
   607                         selector.selectedKeys().clear();
       
   608                     }
       
   609 
       
   610                     for (AsyncEvent event : readyList) {
       
   611                         assert event instanceof AsyncTriggerEvent;
       
   612                         event.handle();
       
   613                     }
       
   614                     readyList.clear();
       
   615 
       
   616                     for (Pair<AsyncEvent,IOException> error : errorList) {
       
   617                         // an IOException was raised and the channel closed.
       
   618                         handleEvent(error.first, error.second);
       
   619                     }
       
   620                     errorList.clear();
       
   621 
       
   622                     // Check whether client is still alive, and if not,
       
   623                     // gracefully stop this thread
       
   624                     if (!owner.isReferenced()) {
       
   625                         Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   626                         return;
       
   627                     }
       
   628 
       
   629                     // Timeouts will have milliseconds granularity. It is important
       
   630                     // to handle them in a timely fashion.
       
   631                     long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
       
   632                     debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout);
       
   633 
       
   634                     // Keep-alive have seconds granularity. It's not really an
       
   635                     // issue if we keep connections linger a bit more in the keep
       
   636                     // alive cache.
       
   637                     long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
       
   638                     debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry);
       
   639 
       
   640                     assert nextTimeout >= 0;
       
   641                     assert nextExpiry >= 0;
       
   642 
       
   643                     // Don't wait for ever as it might prevent the thread to
       
   644                     // stop gracefully. millis will be 0 if no deadline was found.
       
   645                     if (nextTimeout <= 0) nextTimeout = NODEADLINE;
       
   646 
       
   647                     // Clip nextExpiry at NODEADLINE limit. The default
       
   648                     // keep alive is 1200 seconds (half an hour) - we don't
       
   649                     // want to wait that long.
       
   650                     if (nextExpiry <= 0) nextExpiry = NODEADLINE;
       
   651                     else nextExpiry = Math.min(NODEADLINE, nextExpiry);
       
   652 
       
   653                     // takes the least of the two.
       
   654                     long millis = Math.min(nextExpiry, nextTimeout);
       
   655 
       
   656                     debugtimeout.log(Level.DEBUG, "Next deadline is %d",
       
   657                                      (millis == 0 ? NODEADLINE : millis));
       
   658                     //debugPrint(selector);
       
   659                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
       
   660                     if (n == 0) {
       
   661                         // Check whether client is still alive, and if not,
       
   662                         // gracefully stop this thread
       
   663                         if (!owner.isReferenced()) {
       
   664                             Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   665                             return;
       
   666                         }
       
   667                         owner.purgeTimeoutsAndReturnNextDeadline();
       
   668                         continue;
       
   669                     }
       
   670                     Set<SelectionKey> keys = selector.selectedKeys();
       
   671 
       
   672                     assert errorList.isEmpty();
       
   673                     for (SelectionKey key : keys) {
       
   674                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   675                         if (!key.isValid()) {
       
   676                             IOException ex = sa.chan.isOpen()
       
   677                                     ? new IOException("Invalid key")
       
   678                                     : new ClosedChannelException();
       
   679                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
       
   680                             sa.pending.clear();
       
   681                             continue;
       
   682                         }
       
   683 
       
   684                         int eventsOccurred;
       
   685                         try {
       
   686                             eventsOccurred = key.readyOps();
       
   687                         } catch (CancelledKeyException ex) {
       
   688                             IOException io = Utils.getIOException(ex);
       
   689                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
       
   690                             sa.pending.clear();
       
   691                             continue;
       
   692                         }
       
   693                         sa.events(eventsOccurred).forEach(readyList::add);
       
   694                         sa.resetInterestOps(eventsOccurred);
       
   695                     }
       
   696                     selector.selectNow(); // complete cancellation
       
   697                     selector.selectedKeys().clear();
       
   698 
       
   699                     for (AsyncEvent event : readyList) {
       
   700                         handleEvent(event, null); // will be delegated to executor
       
   701                     }
       
   702                     readyList.clear();
       
   703                     errorList.forEach((p) -> handleEvent(p.first, p.second));
       
   704                     errorList.clear();
       
   705                 }
       
   706             } catch (Throwable e) {
       
   707                 //e.printStackTrace();
       
   708                 if (!closed) {
       
   709                     // This terminates thread. So, better just print stack trace
       
   710                     String err = Utils.stackTrace(e);
       
   711                     Log.logError("HttpClientImpl: fatal error: " + err);
       
   712                 }
       
   713                 debug.log(Level.DEBUG, "shutting down", e);
       
   714                 if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) {
       
   715                     e.printStackTrace(System.err); // always print the stack
       
   716                 }
       
   717             } finally {
       
   718                 shutdown();
       
   719             }
       
   720         }
       
   721 
       
   722 //        void debugPrint(Selector selector) {
       
   723 //            System.err.println("Selector: debugprint start");
       
   724 //            Set<SelectionKey> keys = selector.keys();
       
   725 //            for (SelectionKey key : keys) {
       
   726 //                SelectableChannel c = key.channel();
       
   727 //                int ops = key.interestOps();
       
   728 //                System.err.printf("selector chan:%s ops:%d\n", c, ops);
       
   729 //            }
       
   730 //            System.err.println("Selector: debugprint end");
       
   731 //        }
       
   732 
       
   733         /** Handles the given event. The given ioe may be null. */
       
   734         void handleEvent(AsyncEvent event, IOException ioe) {
       
   735             if (closed || ioe != null) {
       
   736                 event.abort(ioe);
       
   737             } else {
       
   738                 event.handle();
       
   739             }
       
   740         }
       
   741     }
       
   742 
       
   743     /**
       
   744      * Tracks multiple user level registrations associated with one NIO
       
   745      * registration (SelectionKey). In this implementation, registrations
       
   746      * are one-off and when an event is posted the registration is cancelled
       
   747      * until explicitly registered again.
       
   748      *
       
   749      * <p> No external synchronization required as this class is only used
       
   750      * by the SelectorManager thread. One of these objects required per
       
   751      * connection.
       
   752      */
       
   753     private static class SelectorAttachment {
       
   754         private final SelectableChannel chan;
       
   755         private final Selector selector;
       
   756         private final Set<AsyncEvent> pending;
       
   757         private final static System.Logger debug =
       
   758                 Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG);
       
   759         private int interestOps;
       
   760 
       
   761         SelectorAttachment(SelectableChannel chan, Selector selector) {
       
   762             this.pending = new HashSet<>();
       
   763             this.chan = chan;
       
   764             this.selector = selector;
       
   765         }
       
   766 
       
   767         void register(AsyncEvent e) throws ClosedChannelException {
       
   768             int newOps = e.interestOps();
       
   769             boolean reRegister = (interestOps & newOps) != newOps;
       
   770             interestOps |= newOps;
       
   771             pending.add(e);
       
   772             if (reRegister) {
       
   773                 // first time registration happens here also
       
   774                 chan.register(selector, interestOps, this);
       
   775             }
       
   776         }
       
   777 
       
   778         /**
       
   779          * Returns a Stream<AsyncEvents> containing only events that are
       
   780          * registered with the given {@code interestOps}.
       
   781          */
       
   782         Stream<AsyncEvent> events(int interestOps) {
       
   783             return pending.stream()
       
   784                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
       
   785         }
       
   786 
       
   787         /**
       
   788          * Removes any events with the given {@code interestOps}, and if no
       
   789          * events remaining, cancels the associated SelectionKey.
       
   790          */
       
   791         void resetInterestOps(int interestOps) {
       
   792             int newOps = 0;
       
   793 
       
   794             Iterator<AsyncEvent> itr = pending.iterator();
       
   795             while (itr.hasNext()) {
       
   796                 AsyncEvent event = itr.next();
       
   797                 int evops = event.interestOps();
       
   798                 if (event.repeating()) {
       
   799                     newOps |= evops;
       
   800                     continue;
       
   801                 }
       
   802                 if ((evops & interestOps) != 0) {
       
   803                     itr.remove();
       
   804                 } else {
       
   805                     newOps |= evops;
       
   806                 }
       
   807             }
       
   808 
       
   809             this.interestOps = newOps;
       
   810             SelectionKey key = chan.keyFor(selector);
       
   811             if (newOps == 0 && pending.isEmpty()) {
       
   812                 key.cancel();
       
   813             } else {
       
   814                 try {
       
   815                     key.interestOps(newOps);
       
   816                 } catch (CancelledKeyException x) {
       
   817                     // channel may have been closed
       
   818                     debug.log(Level.DEBUG, "key cancelled for " + chan);
       
   819                     abortPending(x);
       
   820                 }
       
   821             }
       
   822         }
       
   823 
       
   824         void abortPending(Throwable x) {
       
   825             if (!pending.isEmpty()) {
       
   826                 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
       
   827                 pending.clear();
       
   828                 IOException io = Utils.getIOException(x);
       
   829                 for (AsyncEvent event : evts) {
       
   830                     event.abort(io);
       
   831                 }
       
   832             }
       
   833         }
       
   834     }
       
   835 
       
   836     /*package-private*/ SSLContext theSSLContext() {
       
   837         return sslContext;
       
   838     }
       
   839 
       
   840     @Override
       
   841     public SSLContext sslContext() {
       
   842         return sslContext;
       
   843     }
       
   844 
       
   845     @Override
       
   846     public SSLParameters sslParameters() {
       
   847         return Utils.copySSLParameters(sslParams);
       
   848     }
       
   849 
       
   850     @Override
       
   851     public Optional<Authenticator> authenticator() {
       
   852         return Optional.ofNullable(authenticator);
       
   853     }
       
   854 
       
   855     /*package-private*/ final Executor theExecutor() {
       
   856         return executor;
       
   857     }
       
   858 
       
   859     @Override
       
   860     public final Optional<Executor> executor() {
       
   861         return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
       
   862     }
       
   863 
       
   864     ConnectionPool connectionPool() {
       
   865         return connections;
       
   866     }
       
   867 
       
   868     @Override
       
   869     public Redirect followRedirects() {
       
   870         return followRedirects;
       
   871     }
       
   872 
       
   873 
       
   874     @Override
       
   875     public Optional<CookieHandler> cookieHandler() {
       
   876         return Optional.ofNullable(cookieHandler);
       
   877     }
       
   878 
       
   879     @Override
       
   880     public Optional<ProxySelector> proxy() {
       
   881         return this.userProxySelector;
       
   882     }
       
   883 
       
   884     // Return the effective proxy that this client uses.
       
   885     ProxySelector proxySelector() {
       
   886         return proxySelector;
       
   887     }
       
   888 
       
   889     @Override
       
   890     public WebSocket.Builder newWebSocketBuilder() {
       
   891         // Make sure to pass the HttpClientFacade to the WebSocket builder.
       
   892         // This will ensure that the facade is not released before the
       
   893         // WebSocket has been created, at which point the pendingOperationCount
       
   894         // will have been incremented by the DetachedConnectionChannel
       
   895         // (see PlainHttpConnection.detachChannel())
       
   896         return new BuilderImpl(this.facade(), proxySelector);
       
   897     }
       
   898 
       
   899     @Override
       
   900     public Version version() {
       
   901         return version;
       
   902     }
       
   903 
       
   904     String dbgString() {
       
   905         return dbgTag;
       
   906     }
       
   907 
       
   908     @Override
       
   909     public String toString() {
       
   910         // Used by tests to get the client's id and compute the
       
   911         // name of the SelectorManager thread.
       
   912         return super.toString() + ("(" + id + ")");
       
   913     }
       
   914 
       
   915     private void initFilters() {
       
   916         addFilter(AuthenticationFilter.class);
       
   917         addFilter(RedirectFilter.class);
       
   918         if (this.cookieHandler != null) {
       
   919             addFilter(CookieFilter.class);
       
   920         }
       
   921     }
       
   922 
       
   923     private void addFilter(Class<? extends HeaderFilter> f) {
       
   924         filters.addFilter(f);
       
   925     }
       
   926 
       
   927     final List<HeaderFilter> filterChain() {
       
   928         return filters.getFilterChain();
       
   929     }
       
   930 
       
   931     // Timer controls.
       
   932     // Timers are implemented through timed Selector.select() calls.
       
   933 
       
   934     synchronized void registerTimer(TimeoutEvent event) {
       
   935         Log.logTrace("Registering timer {0}", event);
       
   936         timeouts.add(event);
       
   937         selmgr.wakeupSelector();
       
   938     }
       
   939 
       
   940     synchronized void cancelTimer(TimeoutEvent event) {
       
   941         Log.logTrace("Canceling timer {0}", event);
       
   942         timeouts.remove(event);
       
   943     }
       
   944 
       
   945     /**
       
   946      * Purges ( handles ) timer events that have passed their deadline, and
       
   947      * returns the amount of time, in milliseconds, until the next earliest
       
   948      * event. A return value of 0 means that there are no events.
       
   949      */
       
   950     private long purgeTimeoutsAndReturnNextDeadline() {
       
   951         long diff = 0L;
       
   952         List<TimeoutEvent> toHandle = null;
       
   953         int remaining = 0;
       
   954         // enter critical section to retrieve the timeout event to handle
       
   955         synchronized(this) {
       
   956             if (timeouts.isEmpty()) return 0L;
       
   957 
       
   958             Instant now = Instant.now();
       
   959             Iterator<TimeoutEvent> itr = timeouts.iterator();
       
   960             while (itr.hasNext()) {
       
   961                 TimeoutEvent event = itr.next();
       
   962                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
       
   963                 if (diff <= 0) {
       
   964                     itr.remove();
       
   965                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
       
   966                     toHandle.add(event);
       
   967                 } else {
       
   968                     break;
       
   969                 }
       
   970             }
       
   971             remaining = timeouts.size();
       
   972         }
       
   973 
       
   974         // can be useful for debugging
       
   975         if (toHandle != null && Log.trace()) {
       
   976             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
       
   977                     +  toHandle.size() + " events, "
       
   978                     + "remaining " + remaining
       
   979                     + ", next deadline: " + (diff < 0 ? 0L : diff));
       
   980         }
       
   981 
       
   982         // handle timeout events out of critical section
       
   983         if (toHandle != null) {
       
   984             Throwable failed = null;
       
   985             for (TimeoutEvent event : toHandle) {
       
   986                 try {
       
   987                    Log.logTrace("Firing timer {0}", event);
       
   988                    event.handle();
       
   989                 } catch (Error | RuntimeException e) {
       
   990                     // Not expected. Handle remaining events then throw...
       
   991                     // If e is an OOME or SOE it might simply trigger a new
       
   992                     // error from here - but in this case there's not much we
       
   993                     // could do anyway. Just let it flow...
       
   994                     if (failed == null) failed = e;
       
   995                     else failed.addSuppressed(e);
       
   996                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
       
   997                 }
       
   998             }
       
   999             if (failed instanceof Error) throw (Error) failed;
       
  1000             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
       
  1001         }
       
  1002 
       
  1003         // return time to wait until next event. 0L if there's no more events.
       
  1004         return diff < 0 ? 0L : diff;
       
  1005     }
       
  1006 
       
  1007     // used for the connection window
       
  1008     int getReceiveBufferSize() {
       
  1009         return Utils.getIntegerNetProperty(
       
  1010                 "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024
       
  1011         );
       
  1012     }
       
  1013 }