src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
changeset 47216 71c04702a3d5
parent 45531 fb3dbffad37b
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
equal deleted inserted replaced
47215:4ebc2e2fb97c 47216:71c04702a3d5
       
     1 /*
       
     2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http;
       
    27 
       
    28 import javax.net.ssl.SSLContext;
       
    29 import javax.net.ssl.SSLParameters;
       
    30 import java.io.IOException;
       
    31 import java.lang.ref.WeakReference;
       
    32 import java.net.Authenticator;
       
    33 import java.net.CookieManager;
       
    34 import java.net.ProxySelector;
       
    35 import java.net.URI;
       
    36 import java.nio.channels.ClosedChannelException;
       
    37 import java.nio.channels.SelectableChannel;
       
    38 import java.nio.channels.SelectionKey;
       
    39 import java.nio.channels.Selector;
       
    40 import java.nio.channels.SocketChannel;
       
    41 import java.security.NoSuchAlgorithmException;
       
    42 import java.time.Instant;
       
    43 import java.time.temporal.ChronoUnit;
       
    44 import java.util.ArrayList;
       
    45 import java.util.Iterator;
       
    46 import java.util.List;
       
    47 import java.util.Optional;
       
    48 import java.util.Set;
       
    49 import java.util.TreeSet;
       
    50 import java.util.concurrent.CompletableFuture;
       
    51 import java.util.concurrent.Executor;
       
    52 import java.util.concurrent.Executors;
       
    53 import java.util.concurrent.ThreadFactory;
       
    54 import java.util.stream.Stream;
       
    55 import jdk.incubator.http.internal.common.Log;
       
    56 import jdk.incubator.http.internal.common.Utils;
       
    57 import jdk.incubator.http.internal.websocket.BuilderImpl;
       
    58 
       
    59 /**
       
    60  * Client implementation. Contains all configuration information and also
       
    61  * the selector manager thread which allows async events to be registered
       
    62  * and delivered when they occur. See AsyncEvent.
       
    63  */
       
    64 class HttpClientImpl extends HttpClient {
       
    65 
       
    66     // Define the default factory as a static inner class
       
    67     // that embeds all the necessary logic to avoid
       
    68     // the risk of using a lambda that might keep a reference on the
       
    69     // HttpClient instance from which it was created (helps with
       
    70     // heapdump analysis).
       
    71     private static final class DefaultThreadFactory implements ThreadFactory {
       
    72         private DefaultThreadFactory() {}
       
    73         @Override
       
    74         public Thread newThread(Runnable r) {
       
    75             Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
       
    76             t.setDaemon(true);
       
    77             return t;
       
    78         }
       
    79         static final ThreadFactory INSTANCE = new DefaultThreadFactory();
       
    80     }
       
    81 
       
    82     private final CookieManager cookieManager;
       
    83     private final Redirect followRedirects;
       
    84     private final ProxySelector proxySelector;
       
    85     private final Authenticator authenticator;
       
    86     private final Version version;
       
    87     private final ConnectionPool connections;
       
    88     private final Executor executor;
       
    89     // Security parameters
       
    90     private final SSLContext sslContext;
       
    91     private final SSLParameters sslParams;
       
    92     private final SelectorManager selmgr;
       
    93     private final FilterFactory filters;
       
    94     private final Http2ClientImpl client2;
       
    95 
       
    96     /** A Set of, deadline first, ordered timeout events. */
       
    97     private final TreeSet<TimeoutEvent> timeouts;
       
    98 
       
    99     public static HttpClientImpl create(HttpClientBuilderImpl builder) {
       
   100         HttpClientImpl impl = new HttpClientImpl(builder);
       
   101         impl.start();
       
   102         return impl;
       
   103     }
       
   104 
       
   105     private HttpClientImpl(HttpClientBuilderImpl builder) {
       
   106         if (builder.sslContext == null) {
       
   107             try {
       
   108                 sslContext = SSLContext.getDefault();
       
   109             } catch (NoSuchAlgorithmException ex) {
       
   110                 throw new InternalError(ex);
       
   111             }
       
   112         } else {
       
   113             sslContext = builder.sslContext;
       
   114         }
       
   115         Executor ex = builder.executor;
       
   116         if (ex == null) {
       
   117             ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
       
   118         } else {
       
   119             ex = builder.executor;
       
   120         }
       
   121         client2 = new Http2ClientImpl(this);
       
   122         executor = ex;
       
   123         cookieManager = builder.cookieManager;
       
   124         followRedirects = builder.followRedirects == null ?
       
   125                 Redirect.NEVER : builder.followRedirects;
       
   126         this.proxySelector = builder.proxy;
       
   127         authenticator = builder.authenticator;
       
   128         if (builder.version == null) {
       
   129             version = HttpClient.Version.HTTP_2;
       
   130         } else {
       
   131             version = builder.version;
       
   132         }
       
   133         if (builder.sslParams == null) {
       
   134             sslParams = getDefaultParams(sslContext);
       
   135         } else {
       
   136             sslParams = builder.sslParams;
       
   137         }
       
   138         connections = new ConnectionPool();
       
   139         connections.start();
       
   140         timeouts = new TreeSet<>();
       
   141         try {
       
   142             selmgr = new SelectorManager(this);
       
   143         } catch (IOException e) {
       
   144             // unlikely
       
   145             throw new InternalError(e);
       
   146         }
       
   147         selmgr.setDaemon(true);
       
   148         filters = new FilterFactory();
       
   149         initFilters();
       
   150     }
       
   151 
       
   152     private void start() {
       
   153         selmgr.start();
       
   154     }
       
   155 
       
   156     private static SSLParameters getDefaultParams(SSLContext ctx) {
       
   157         SSLParameters params = ctx.getSupportedSSLParameters();
       
   158         params.setProtocols(new String[]{"TLSv1.2"});
       
   159         return params;
       
   160     }
       
   161 
       
   162     /**
       
   163      * Wait for activity on given exchange (assuming blocking = false).
       
   164      * It's a no-op if blocking = true. In particular, the following occurs
       
   165      * in the SelectorManager thread.
       
   166      *
       
   167      *  1) mark the connection non-blocking
       
   168      *  2) add to selector
       
   169      *  3) If selector fires for this exchange then
       
   170      *  4)   - mark connection as blocking
       
   171      *  5)   - call AsyncEvent.handle()
       
   172      *
       
   173      * If exchange needs to block again, then call registerEvent() again
       
   174      */
       
   175     void registerEvent(AsyncEvent exchange) throws IOException {
       
   176         selmgr.register(exchange);
       
   177     }
       
   178 
       
   179     /**
       
   180      * Only used from RawChannel to disconnect the channel from
       
   181      * the selector
       
   182      */
       
   183     void cancelRegistration(SocketChannel s) {
       
   184         selmgr.cancel(s);
       
   185     }
       
   186 
       
   187 
       
   188     Http2ClientImpl client2() {
       
   189         return client2;
       
   190     }
       
   191 
       
   192     /*
       
   193     @Override
       
   194     public ByteBuffer getBuffer() {
       
   195         return pool.getBuffer();
       
   196     }
       
   197 
       
   198     // SSL buffers are larger. Manage separately
       
   199 
       
   200     int size = 16 * 1024;
       
   201 
       
   202     ByteBuffer getSSLBuffer() {
       
   203         return ByteBuffer.allocate(size);
       
   204     }
       
   205 
       
   206     /**
       
   207      * Return a new buffer that's a bit bigger than the given one
       
   208      *
       
   209      * @param buf
       
   210      * @return
       
   211      *
       
   212     ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
       
   213         size = buf.capacity() * 12 / 10; // 20% bigger
       
   214         return ByteBuffer.allocate(size);
       
   215     }
       
   216 
       
   217     synchronized void returnSSLBuffer(ByteBuffer buf) {
       
   218         if (buf.capacity() >= size)
       
   219            sslBuffers.add(0, buf);
       
   220     }
       
   221 
       
   222     @Override
       
   223     public void returnBuffer(ByteBuffer buffer) {
       
   224         pool.returnBuffer(buffer);
       
   225     }
       
   226     */
       
   227 
       
   228     @Override
       
   229     public <T> HttpResponse<T>
       
   230     send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
       
   231         throws IOException, InterruptedException
       
   232     {
       
   233         MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
       
   234         return mex.response();
       
   235     }
       
   236 
       
   237     @Override
       
   238     public <T> CompletableFuture<HttpResponse<T>>
       
   239     sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
       
   240     {
       
   241         MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
       
   242         return mex.responseAsync()
       
   243                   .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
       
   244     }
       
   245 
       
   246     @Override
       
   247     public <U, T> CompletableFuture<U>
       
   248     sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
       
   249         MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
       
   250         return mex.multiResponseAsync();
       
   251     }
       
   252 
       
   253     // new impl. Should get rid of above
       
   254     /*
       
   255     static class BufferPool implements BufferHandler {
       
   256 
       
   257         final LinkedList<ByteBuffer> freelist = new LinkedList<>();
       
   258 
       
   259         @Override
       
   260         public synchronized ByteBuffer getBuffer() {
       
   261             ByteBuffer buf;
       
   262 
       
   263             while (!freelist.isEmpty()) {
       
   264                 buf = freelist.removeFirst();
       
   265                 buf.clear();
       
   266                 return buf;
       
   267             }
       
   268             return ByteBuffer.allocate(BUFSIZE);
       
   269         }
       
   270 
       
   271         @Override
       
   272         public synchronized void returnBuffer(ByteBuffer buffer) {
       
   273             assert buffer.capacity() > 0;
       
   274             freelist.add(buffer);
       
   275         }
       
   276     }
       
   277 
       
   278     static BufferPool pool = new BufferPool();
       
   279 
       
   280     static BufferHandler pool() {
       
   281         return pool;
       
   282     }
       
   283 */
       
   284     // Main loop for this client's selector
       
   285     private final static class SelectorManager extends Thread {
       
   286 
       
   287         private static final long NODEADLINE = 3000L;
       
   288         private final Selector selector;
       
   289         private volatile boolean closed;
       
   290         private final List<AsyncEvent> readyList;
       
   291         private final List<AsyncEvent> registrations;
       
   292 
       
   293         // Uses a weak reference to the HttpClient owning this
       
   294         // selector: a strong reference prevents its garbage
       
   295         // collection while the thread is running.
       
   296         // We want the thread to exit gracefully when the
       
   297         // HttpClient that owns it gets GC'ed.
       
   298         WeakReference<HttpClientImpl> ownerRef;
       
   299 
       
   300         SelectorManager(HttpClientImpl ref) throws IOException {
       
   301             super(null, null, "SelectorManager", 0, false);
       
   302             ownerRef = new WeakReference<>(ref);
       
   303             readyList = new ArrayList<>();
       
   304             registrations = new ArrayList<>();
       
   305             selector = Selector.open();
       
   306         }
       
   307 
       
   308         // This returns immediately. So caller not allowed to send/receive
       
   309         // on connection.
       
   310 
       
   311         synchronized void register(AsyncEvent e) throws IOException {
       
   312             registrations.add(e);
       
   313             selector.wakeup();
       
   314         }
       
   315 
       
   316         synchronized void cancel(SocketChannel e) {
       
   317             SelectionKey key = e.keyFor(selector);
       
   318             if (key != null) {
       
   319                 key.cancel();
       
   320             }
       
   321             selector.wakeup();
       
   322         }
       
   323 
       
   324         void wakeupSelector() {
       
   325             selector.wakeup();
       
   326         }
       
   327 
       
   328         synchronized void shutdown() {
       
   329             closed = true;
       
   330             try {
       
   331                 selector.close();
       
   332             } catch (IOException ignored) { }
       
   333         }
       
   334 
       
   335         @Override
       
   336         public void run() {
       
   337             try {
       
   338                 while (!Thread.currentThread().isInterrupted()) {
       
   339                     HttpClientImpl client;
       
   340                     synchronized (this) {
       
   341                         for (AsyncEvent exchange : registrations) {
       
   342                             SelectableChannel c = exchange.channel();
       
   343                             try {
       
   344                                 c.configureBlocking(false);
       
   345                                 SelectionKey key = c.keyFor(selector);
       
   346                                 SelectorAttachment sa;
       
   347                                 if (key == null || !key.isValid()) {
       
   348                                     if (key != null) {
       
   349                                         // key is canceled.
       
   350                                         // invoke selectNow() to purge it
       
   351                                         // before registering the new event.
       
   352                                         selector.selectNow();
       
   353                                     }
       
   354                                     sa = new SelectorAttachment(c, selector);
       
   355                                 } else {
       
   356                                     sa = (SelectorAttachment) key.attachment();
       
   357                                 }
       
   358                                 sa.register(exchange);
       
   359                             } catch (IOException e) {
       
   360                                 Log.logError("HttpClientImpl: " + e);
       
   361                                 c.close();
       
   362                                 // let the exchange deal with it
       
   363                                 handleEvent(exchange);
       
   364                             }
       
   365                         }
       
   366                         registrations.clear();
       
   367                     }
       
   368 
       
   369                     // Check whether client is still alive, and if not,
       
   370                     // gracefully stop this thread
       
   371                     if ((client = ownerRef.get()) == null) {
       
   372                         Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   373                         return;
       
   374                     }
       
   375                     long millis = client.purgeTimeoutsAndReturnNextDeadline();
       
   376                     client = null; // don't hold onto the client ref
       
   377 
       
   378                     //debugPrint(selector);
       
   379                     // Don't wait for ever as it might prevent the thread to
       
   380                     // stop gracefully. millis will be 0 if no deadline was found.
       
   381                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
       
   382                     if (n == 0) {
       
   383                         // Check whether client is still alive, and if not,
       
   384                         // gracefully stop this thread
       
   385                         if ((client = ownerRef.get()) == null) {
       
   386                             Log.logTrace("HttpClient no longer referenced. Exiting...");
       
   387                             return;
       
   388                         }
       
   389                         client.purgeTimeoutsAndReturnNextDeadline();
       
   390                         client = null; // don't hold onto the client ref
       
   391                         continue;
       
   392                     }
       
   393                     Set<SelectionKey> keys = selector.selectedKeys();
       
   394 
       
   395                     for (SelectionKey key : keys) {
       
   396                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
       
   397                         int eventsOccurred = key.readyOps();
       
   398                         sa.events(eventsOccurred).forEach(readyList::add);
       
   399                         sa.resetInterestOps(eventsOccurred);
       
   400                     }
       
   401                     selector.selectNow(); // complete cancellation
       
   402                     selector.selectedKeys().clear();
       
   403 
       
   404                     for (AsyncEvent exchange : readyList) {
       
   405                         if (exchange.blocking()) {
       
   406                             exchange.channel().configureBlocking(true);
       
   407                         }
       
   408                         handleEvent(exchange); // will be delegated to executor
       
   409                     }
       
   410                     readyList.clear();
       
   411                 }
       
   412             } catch (Throwable e) {
       
   413                 if (!closed) {
       
   414                     // This terminates thread. So, better just print stack trace
       
   415                     String err = Utils.stackTrace(e);
       
   416                     Log.logError("HttpClientImpl: fatal error: " + err);
       
   417                 }
       
   418             } finally {
       
   419                 shutdown();
       
   420             }
       
   421         }
       
   422 
       
   423         void debugPrint(Selector selector) {
       
   424             System.err.println("Selector: debugprint start");
       
   425             Set<SelectionKey> keys = selector.keys();
       
   426             for (SelectionKey key : keys) {
       
   427                 SelectableChannel c = key.channel();
       
   428                 int ops = key.interestOps();
       
   429                 System.err.printf("selector chan:%s ops:%d\n", c, ops);
       
   430             }
       
   431             System.err.println("Selector: debugprint end");
       
   432         }
       
   433 
       
   434         void handleEvent(AsyncEvent e) {
       
   435             if (closed) {
       
   436                 e.abort();
       
   437             } else {
       
   438                 e.handle();
       
   439             }
       
   440         }
       
   441     }
       
   442 
       
   443     /**
       
   444      * Tracks multiple user level registrations associated with one NIO
       
   445      * registration (SelectionKey). In this implementation, registrations
       
   446      * are one-off and when an event is posted the registration is cancelled
       
   447      * until explicitly registered again.
       
   448      *
       
   449      * <p> No external synchronization required as this class is only used
       
   450      * by the SelectorManager thread. One of these objects required per
       
   451      * connection.
       
   452      */
       
   453     private static class SelectorAttachment {
       
   454         private final SelectableChannel chan;
       
   455         private final Selector selector;
       
   456         private final ArrayList<AsyncEvent> pending;
       
   457         private int interestOps;
       
   458 
       
   459         SelectorAttachment(SelectableChannel chan, Selector selector) {
       
   460             this.pending = new ArrayList<>();
       
   461             this.chan = chan;
       
   462             this.selector = selector;
       
   463         }
       
   464 
       
   465         void register(AsyncEvent e) throws ClosedChannelException {
       
   466             int newOps = e.interestOps();
       
   467             boolean reRegister = (interestOps & newOps) != newOps;
       
   468             interestOps |= newOps;
       
   469             pending.add(e);
       
   470             if (reRegister) {
       
   471                 // first time registration happens here also
       
   472                 chan.register(selector, interestOps, this);
       
   473             }
       
   474         }
       
   475 
       
   476         /**
       
   477          * Returns a Stream<AsyncEvents> containing only events that are
       
   478          * registered with the given {@code interestOps}.
       
   479          */
       
   480         Stream<AsyncEvent> events(int interestOps) {
       
   481             return pending.stream()
       
   482                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
       
   483         }
       
   484 
       
   485         /**
       
   486          * Removes any events with the given {@code interestOps}, and if no
       
   487          * events remaining, cancels the associated SelectionKey.
       
   488          */
       
   489         void resetInterestOps(int interestOps) {
       
   490             int newOps = 0;
       
   491 
       
   492             Iterator<AsyncEvent> itr = pending.iterator();
       
   493             while (itr.hasNext()) {
       
   494                 AsyncEvent event = itr.next();
       
   495                 int evops = event.interestOps();
       
   496                 if (event.repeating()) {
       
   497                     newOps |= evops;
       
   498                     continue;
       
   499                 }
       
   500                 if ((evops & interestOps) != 0) {
       
   501                     itr.remove();
       
   502                 } else {
       
   503                     newOps |= evops;
       
   504                 }
       
   505             }
       
   506 
       
   507             this.interestOps = newOps;
       
   508             SelectionKey key = chan.keyFor(selector);
       
   509             if (newOps == 0) {
       
   510                 key.cancel();
       
   511             } else {
       
   512                 key.interestOps(newOps);
       
   513             }
       
   514         }
       
   515     }
       
   516 
       
   517     @Override
       
   518     public SSLContext sslContext() {
       
   519         Utils.checkNetPermission("getSSLContext");
       
   520         return sslContext;
       
   521     }
       
   522 
       
   523     @Override
       
   524     public Optional<SSLParameters> sslParameters() {
       
   525         return Optional.ofNullable(sslParams);
       
   526     }
       
   527 
       
   528     @Override
       
   529     public Optional<Authenticator> authenticator() {
       
   530         return Optional.ofNullable(authenticator);
       
   531     }
       
   532 
       
   533     @Override
       
   534     public Executor executor() {
       
   535         return executor;
       
   536     }
       
   537 
       
   538     ConnectionPool connectionPool() {
       
   539         return connections;
       
   540     }
       
   541 
       
   542     @Override
       
   543     public Redirect followRedirects() {
       
   544         return followRedirects;
       
   545     }
       
   546 
       
   547 
       
   548     @Override
       
   549     public Optional<CookieManager> cookieManager() {
       
   550         return Optional.ofNullable(cookieManager);
       
   551     }
       
   552 
       
   553     @Override
       
   554     public Optional<ProxySelector> proxy() {
       
   555         return Optional.ofNullable(this.proxySelector);
       
   556     }
       
   557 
       
   558     @Override
       
   559     public WebSocket.Builder newWebSocketBuilder(URI uri,
       
   560                                                  WebSocket.Listener listener) {
       
   561         return new BuilderImpl(this, uri, listener);
       
   562     }
       
   563 
       
   564     @Override
       
   565     public Version version() {
       
   566         return version;
       
   567     }
       
   568 
       
   569     //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
       
   570 
       
   571     boolean getHttp2Allowed() {
       
   572         return version.equals(Version.HTTP_2);
       
   573     }
       
   574 
       
   575     private void initFilters() {
       
   576         addFilter(AuthenticationFilter.class);
       
   577         addFilter(RedirectFilter.class);
       
   578         if (this.cookieManager != null) {
       
   579             addFilter(CookieFilter.class);
       
   580         }
       
   581     }
       
   582 
       
   583     private void addFilter(Class<? extends HeaderFilter> f) {
       
   584         filters.addFilter(f);
       
   585     }
       
   586 
       
   587     final List<HeaderFilter> filterChain() {
       
   588         return filters.getFilterChain();
       
   589     }
       
   590 
       
   591     // Timer controls.
       
   592     // Timers are implemented through timed Selector.select() calls.
       
   593 
       
   594     synchronized void registerTimer(TimeoutEvent event) {
       
   595         Log.logTrace("Registering timer {0}", event);
       
   596         timeouts.add(event);
       
   597         selmgr.wakeupSelector();
       
   598     }
       
   599 
       
   600     synchronized void cancelTimer(TimeoutEvent event) {
       
   601         Log.logTrace("Canceling timer {0}", event);
       
   602         timeouts.remove(event);
       
   603     }
       
   604 
       
   605     /**
       
   606      * Purges ( handles ) timer events that have passed their deadline, and
       
   607      * returns the amount of time, in milliseconds, until the next earliest
       
   608      * event. A return value of 0 means that there are no events.
       
   609      */
       
   610     private long purgeTimeoutsAndReturnNextDeadline() {
       
   611         long diff = 0L;
       
   612         List<TimeoutEvent> toHandle = null;
       
   613         int remaining = 0;
       
   614         // enter critical section to retrieve the timeout event to handle
       
   615         synchronized(this) {
       
   616             if (timeouts.isEmpty()) return 0L;
       
   617 
       
   618             Instant now = Instant.now();
       
   619             Iterator<TimeoutEvent> itr = timeouts.iterator();
       
   620             while (itr.hasNext()) {
       
   621                 TimeoutEvent event = itr.next();
       
   622                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
       
   623                 if (diff <= 0) {
       
   624                     itr.remove();
       
   625                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
       
   626                     toHandle.add(event);
       
   627                 } else {
       
   628                     break;
       
   629                 }
       
   630             }
       
   631             remaining = timeouts.size();
       
   632         }
       
   633 
       
   634         // can be useful for debugging
       
   635         if (toHandle != null && Log.trace()) {
       
   636             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
       
   637                     + (toHandle == null ? 0 : toHandle.size()) + " events, "
       
   638                     + "remaining " + remaining
       
   639                     + ", next deadline: " + (diff < 0 ? 0L : diff));
       
   640         }
       
   641 
       
   642         // handle timeout events out of critical section
       
   643         if (toHandle != null) {
       
   644             Throwable failed = null;
       
   645             for (TimeoutEvent event : toHandle) {
       
   646                 try {
       
   647                    Log.logTrace("Firing timer {0}", event);
       
   648                    event.handle();
       
   649                 } catch (Error | RuntimeException e) {
       
   650                     // Not expected. Handle remaining events then throw...
       
   651                     // If e is an OOME or SOE it might simply trigger a new
       
   652                     // error from here - but in this case there's not much we
       
   653                     // could do anyway. Just let it flow...
       
   654                     if (failed == null) failed = e;
       
   655                     else failed.addSuppressed(e);
       
   656                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
       
   657                 }
       
   658             }
       
   659             if (failed instanceof Error) throw (Error) failed;
       
   660             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
       
   661         }
       
   662 
       
   663         // return time to wait until next event. 0L if there's no more events.
       
   664         return diff < 0 ? 0L : diff;
       
   665     }
       
   666 
       
   667     // used for the connection window
       
   668     int getReceiveBufferSize() {
       
   669         return Utils.getIntegerNetProperty(
       
   670                 "jdk.httpclient.connectionWindowSize", 256 * 1024
       
   671         );
       
   672     }
       
   673 }