|
1 /* |
|
2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package jdk.internal.net.http; |
|
27 |
|
28 import javax.net.ssl.SSLContext; |
|
29 import javax.net.ssl.SSLParameters; |
|
30 import java.io.IOException; |
|
31 import java.lang.System.Logger.Level; |
|
32 import java.lang.ref.Reference; |
|
33 import java.lang.ref.WeakReference; |
|
34 import java.net.Authenticator; |
|
35 import java.net.CookieHandler; |
|
36 import java.net.ProxySelector; |
|
37 import java.nio.channels.CancelledKeyException; |
|
38 import java.nio.channels.ClosedChannelException; |
|
39 import java.nio.channels.SelectableChannel; |
|
40 import java.nio.channels.SelectionKey; |
|
41 import java.nio.channels.Selector; |
|
42 import java.nio.channels.SocketChannel; |
|
43 import java.security.AccessControlContext; |
|
44 import java.security.AccessController; |
|
45 import java.security.NoSuchAlgorithmException; |
|
46 import java.security.PrivilegedAction; |
|
47 import java.time.Instant; |
|
48 import java.time.temporal.ChronoUnit; |
|
49 import java.util.ArrayList; |
|
50 import java.util.Arrays; |
|
51 import java.util.HashSet; |
|
52 import java.util.Iterator; |
|
53 import java.util.LinkedList; |
|
54 import java.util.List; |
|
55 import java.util.Objects; |
|
56 import java.util.Optional; |
|
57 import java.util.Set; |
|
58 import java.util.TreeSet; |
|
59 import java.util.concurrent.CompletableFuture; |
|
60 import java.util.concurrent.ExecutionException; |
|
61 import java.util.concurrent.Executor; |
|
62 import java.util.concurrent.Executors; |
|
63 import java.util.concurrent.ThreadFactory; |
|
64 import java.util.concurrent.atomic.AtomicInteger; |
|
65 import java.util.concurrent.atomic.AtomicLong; |
|
66 import java.util.stream.Stream; |
|
67 import java.net.http.HttpClient; |
|
68 import java.net.http.HttpRequest; |
|
69 import java.net.http.HttpResponse; |
|
70 import java.net.http.HttpResponse.BodyHandler; |
|
71 import java.net.http.HttpResponse.PushPromiseHandler; |
|
72 import java.net.http.WebSocket; |
|
73 import jdk.internal.net.http.common.Log; |
|
74 import jdk.internal.net.http.common.Logger; |
|
75 import jdk.internal.net.http.common.Pair; |
|
76 import jdk.internal.net.http.common.Utils; |
|
77 import jdk.internal.net.http.common.OperationTrackers.Trackable; |
|
78 import jdk.internal.net.http.common.OperationTrackers.Tracker; |
|
79 import jdk.internal.net.http.websocket.BuilderImpl; |
|
80 import jdk.internal.misc.InnocuousThread; |
|
81 |
|
82 /** |
|
83 * Client implementation. Contains all configuration information and also |
|
84 * the selector manager thread which allows async events to be registered |
|
85 * and delivered when they occur. See AsyncEvent. |
|
86 */ |
|
87 final class HttpClientImpl extends HttpClient implements Trackable { |
|
88 |
|
89 static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag |
|
90 static final boolean DEBUGTIMEOUT = false; // dev flag |
|
91 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
|
92 final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED); |
|
93 final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT); |
|
94 static final AtomicLong CLIENT_IDS = new AtomicLong(); |
|
95 |
|
96 // Define the default factory as a static inner class |
|
97 // that embeds all the necessary logic to avoid |
|
98 // the risk of using a lambda that might keep a reference on the |
|
99 // HttpClient instance from which it was created (helps with |
|
100 // heapdump analysis). |
|
101 private static final class DefaultThreadFactory implements ThreadFactory { |
|
102 private final String namePrefix; |
|
103 private final AtomicInteger nextId = new AtomicInteger(); |
|
104 |
|
105 DefaultThreadFactory(long clientID) { |
|
106 namePrefix = "HttpClient-" + clientID + "-Worker-"; |
|
107 } |
|
108 |
|
109 @Override |
|
110 public Thread newThread(Runnable r) { |
|
111 String name = namePrefix + nextId.getAndIncrement(); |
|
112 Thread t; |
|
113 if (System.getSecurityManager() == null) { |
|
114 t = new Thread(null, r, name, 0, false); |
|
115 } else { |
|
116 t = InnocuousThread.newThread(name, r); |
|
117 } |
|
118 t.setDaemon(true); |
|
119 return t; |
|
120 } |
|
121 } |
|
122 |
|
123 private final CookieHandler cookieHandler; |
|
124 private final Redirect followRedirects; |
|
125 private final Optional<ProxySelector> userProxySelector; |
|
126 private final ProxySelector proxySelector; |
|
127 private final Authenticator authenticator; |
|
128 private final Version version; |
|
129 private final ConnectionPool connections; |
|
130 private final Executor executor; |
|
131 private final boolean isDefaultExecutor; |
|
132 // Security parameters |
|
133 private final SSLContext sslContext; |
|
134 private final SSLParameters sslParams; |
|
135 private final SelectorManager selmgr; |
|
136 private final FilterFactory filters; |
|
137 private final Http2ClientImpl client2; |
|
138 private final long id; |
|
139 private final String dbgTag; |
|
140 |
|
141 // This reference is used to keep track of the facade HttpClient |
|
142 // that was returned to the application code. |
|
143 // It makes it possible to know when the application no longer |
|
144 // holds any reference to the HttpClient. |
|
145 // Unfortunately, this information is not enough to know when |
|
146 // to exit the SelectorManager thread. Because of the asynchronous |
|
147 // nature of the API, we also need to wait until all pending operations |
|
148 // have completed. |
|
149 private final WeakReference<HttpClientFacade> facadeRef; |
|
150 |
|
151 // This counter keeps track of the number of operations pending |
|
152 // on the HttpClient. The SelectorManager thread will wait |
|
153 // until there are no longer any pending operations and the |
|
154 // facadeRef is cleared before exiting. |
|
155 // |
|
156 // The pendingOperationCount is incremented every time a send/sendAsync |
|
157 // operation is invoked on the HttpClient, and is decremented when |
|
158 // the HttpResponse<T> object is returned to the user. |
|
159 // However, at this point, the body may not have been fully read yet. |
|
160 // This is the case when the response T is implemented as a streaming |
|
161 // subscriber (such as an InputStream). |
|
162 // |
|
163 // To take care of this issue the pendingOperationCount will additionally |
|
164 // be incremented/decremented in the following cases: |
|
165 // |
|
166 // 1. For HTTP/2 it is incremented when a stream is added to the |
|
167 // Http2Connection streams map, and decreased when the stream is removed |
|
168 // from the map. This should also take care of push promises. |
|
169 // 2. For WebSocket the count is increased when creating a |
|
170 // DetachedConnectionChannel for the socket, and decreased |
|
171 // when the the channel is closed. |
|
172 // In addition, the HttpClient facade is passed to the WebSocket builder, |
|
173 // (instead of the client implementation delegate). |
|
174 // 3. For HTTP/1.1 the count is incremented before starting to parse the body |
|
175 // response, and decremented when the parser has reached the end of the |
|
176 // response body flow. |
|
177 // |
|
178 // This should ensure that the selector manager thread remains alive until |
|
179 // the response has been fully received or the web socket is closed. |
|
180 private final AtomicLong pendingOperationCount = new AtomicLong(); |
|
181 private final AtomicLong pendingWebSocketCount = new AtomicLong(); |
|
182 private final AtomicLong pendingHttpRequestCount = new AtomicLong(); |
|
183 private final AtomicLong pendingHttp2StreamCount = new AtomicLong(); |
|
184 |
|
185 /** A Set of, deadline first, ordered timeout events. */ |
|
186 private final TreeSet<TimeoutEvent> timeouts; |
|
187 |
|
188 /** |
|
189 * This is a bit tricky: |
|
190 * 1. an HttpClientFacade has a final HttpClientImpl field. |
|
191 * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field, |
|
192 * where the referent is the facade created for that instance. |
|
193 * 3. We cannot just create the HttpClientFacade in the HttpClientImpl |
|
194 * constructor, because it would be only weakly referenced and could |
|
195 * be GC'ed before we can return it. |
|
196 * The solution is to use an instance of SingleFacadeFactory which will |
|
197 * allow the caller of new HttpClientImpl(...) to retrieve the facade |
|
198 * after the HttpClientImpl has been created. |
|
199 */ |
|
200 private static final class SingleFacadeFactory { |
|
201 HttpClientFacade facade; |
|
202 HttpClientFacade createFacade(HttpClientImpl impl) { |
|
203 assert facade == null; |
|
204 return (facade = new HttpClientFacade(impl)); |
|
205 } |
|
206 } |
|
207 |
|
208 static HttpClientFacade create(HttpClientBuilderImpl builder) { |
|
209 SingleFacadeFactory facadeFactory = new SingleFacadeFactory(); |
|
210 HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory); |
|
211 impl.start(); |
|
212 assert facadeFactory.facade != null; |
|
213 assert impl.facadeRef.get() == facadeFactory.facade; |
|
214 return facadeFactory.facade; |
|
215 } |
|
216 |
|
217 private HttpClientImpl(HttpClientBuilderImpl builder, |
|
218 SingleFacadeFactory facadeFactory) { |
|
219 id = CLIENT_IDS.incrementAndGet(); |
|
220 dbgTag = "HttpClientImpl(" + id +")"; |
|
221 if (builder.sslContext == null) { |
|
222 try { |
|
223 sslContext = SSLContext.getDefault(); |
|
224 } catch (NoSuchAlgorithmException ex) { |
|
225 throw new InternalError(ex); |
|
226 } |
|
227 } else { |
|
228 sslContext = builder.sslContext; |
|
229 } |
|
230 Executor ex = builder.executor; |
|
231 if (ex == null) { |
|
232 ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); |
|
233 isDefaultExecutor = true; |
|
234 } else { |
|
235 ex = builder.executor; |
|
236 isDefaultExecutor = false; |
|
237 } |
|
238 facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); |
|
239 client2 = new Http2ClientImpl(this); |
|
240 executor = ex; |
|
241 cookieHandler = builder.cookieHandler; |
|
242 followRedirects = builder.followRedirects == null ? |
|
243 Redirect.NEVER : builder.followRedirects; |
|
244 this.userProxySelector = Optional.ofNullable(builder.proxy); |
|
245 this.proxySelector = userProxySelector |
|
246 .orElseGet(HttpClientImpl::getDefaultProxySelector); |
|
247 if (debug.on()) |
|
248 debug.log("proxySelector is %s (user-supplied=%s)", |
|
249 this.proxySelector, userProxySelector.isPresent()); |
|
250 authenticator = builder.authenticator; |
|
251 if (builder.version == null) { |
|
252 version = HttpClient.Version.HTTP_2; |
|
253 } else { |
|
254 version = builder.version; |
|
255 } |
|
256 if (builder.sslParams == null) { |
|
257 sslParams = getDefaultParams(sslContext); |
|
258 } else { |
|
259 sslParams = builder.sslParams; |
|
260 } |
|
261 connections = new ConnectionPool(id); |
|
262 connections.start(); |
|
263 timeouts = new TreeSet<>(); |
|
264 try { |
|
265 selmgr = new SelectorManager(this); |
|
266 } catch (IOException e) { |
|
267 // unlikely |
|
268 throw new InternalError(e); |
|
269 } |
|
270 selmgr.setDaemon(true); |
|
271 filters = new FilterFactory(); |
|
272 initFilters(); |
|
273 assert facadeRef.get() != null; |
|
274 } |
|
275 |
|
276 private void start() { |
|
277 selmgr.start(); |
|
278 } |
|
279 |
|
280 // Called from the SelectorManager thread, just before exiting. |
|
281 // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections |
|
282 // that may be still lingering there are properly closed (and their |
|
283 // possibly still opened SocketChannel released). |
|
284 private void stop() { |
|
285 // Clears HTTP/1.1 cache and close its connections |
|
286 connections.stop(); |
|
287 // Clears HTTP/2 cache and close its connections. |
|
288 client2.stop(); |
|
289 } |
|
290 |
|
291 private static SSLParameters getDefaultParams(SSLContext ctx) { |
|
292 SSLParameters params = ctx.getSupportedSSLParameters(); |
|
293 params.setProtocols(new String[]{"TLSv1.2"}); |
|
294 return params; |
|
295 } |
|
296 |
|
297 private static ProxySelector getDefaultProxySelector() { |
|
298 PrivilegedAction<ProxySelector> action = ProxySelector::getDefault; |
|
299 return AccessController.doPrivileged(action); |
|
300 } |
|
301 |
|
302 // Returns the facade that was returned to the application code. |
|
303 // May be null if that facade is no longer referenced. |
|
304 final HttpClientFacade facade() { |
|
305 return facadeRef.get(); |
|
306 } |
|
307 |
|
308 // Increments the pendingOperationCount. |
|
309 final long reference() { |
|
310 pendingHttpRequestCount.incrementAndGet(); |
|
311 return pendingOperationCount.incrementAndGet(); |
|
312 } |
|
313 |
|
314 // Decrements the pendingOperationCount. |
|
315 final long unreference() { |
|
316 final long count = pendingOperationCount.decrementAndGet(); |
|
317 final long httpCount = pendingHttpRequestCount.decrementAndGet(); |
|
318 final long http2Count = pendingHttp2StreamCount.get(); |
|
319 final long webSocketCount = pendingWebSocketCount.get(); |
|
320 if (count == 0 && facade() == null) { |
|
321 selmgr.wakeupSelector(); |
|
322 } |
|
323 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; |
|
324 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; |
|
325 assert webSocketCount >= 0 : "count of WS operations < 0"; |
|
326 assert count >= 0 : "count of pending operations < 0"; |
|
327 return count; |
|
328 } |
|
329 |
|
330 // Increments the pendingOperationCount. |
|
331 final long streamReference() { |
|
332 pendingHttp2StreamCount.incrementAndGet(); |
|
333 return pendingOperationCount.incrementAndGet(); |
|
334 } |
|
335 |
|
336 // Decrements the pendingOperationCount. |
|
337 final long streamUnreference() { |
|
338 final long count = pendingOperationCount.decrementAndGet(); |
|
339 final long http2Count = pendingHttp2StreamCount.decrementAndGet(); |
|
340 final long httpCount = pendingHttpRequestCount.get(); |
|
341 final long webSocketCount = pendingWebSocketCount.get(); |
|
342 if (count == 0 && facade() == null) { |
|
343 selmgr.wakeupSelector(); |
|
344 } |
|
345 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; |
|
346 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; |
|
347 assert webSocketCount >= 0 : "count of WS operations < 0"; |
|
348 assert count >= 0 : "count of pending operations < 0"; |
|
349 return count; |
|
350 } |
|
351 |
|
352 // Increments the pendingOperationCount. |
|
353 final long webSocketOpen() { |
|
354 pendingWebSocketCount.incrementAndGet(); |
|
355 return pendingOperationCount.incrementAndGet(); |
|
356 } |
|
357 |
|
358 // Decrements the pendingOperationCount. |
|
359 final long webSocketClose() { |
|
360 final long count = pendingOperationCount.decrementAndGet(); |
|
361 final long webSocketCount = pendingWebSocketCount.decrementAndGet(); |
|
362 final long httpCount = pendingHttpRequestCount.get(); |
|
363 final long http2Count = pendingHttp2StreamCount.get(); |
|
364 if (count == 0 && facade() == null) { |
|
365 selmgr.wakeupSelector(); |
|
366 } |
|
367 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; |
|
368 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; |
|
369 assert webSocketCount >= 0 : "count of WS operations < 0"; |
|
370 assert count >= 0 : "count of pending operations < 0"; |
|
371 return count; |
|
372 } |
|
373 |
|
374 // Returns the pendingOperationCount. |
|
375 final long referenceCount() { |
|
376 return pendingOperationCount.get(); |
|
377 } |
|
378 |
|
379 final static class HttpClientTracker implements Tracker { |
|
380 final AtomicLong httpCount; |
|
381 final AtomicLong http2Count; |
|
382 final AtomicLong websocketCount; |
|
383 final AtomicLong operationsCount; |
|
384 final Reference<?> reference; |
|
385 final String name; |
|
386 HttpClientTracker(AtomicLong http, |
|
387 AtomicLong http2, |
|
388 AtomicLong ws, |
|
389 AtomicLong ops, |
|
390 Reference<?> ref, |
|
391 String name) { |
|
392 this.httpCount = http; |
|
393 this.http2Count = http2; |
|
394 this.websocketCount = ws; |
|
395 this.operationsCount = ops; |
|
396 this.reference = ref; |
|
397 this.name = name; |
|
398 } |
|
399 @Override |
|
400 public long getOutstandingOperations() { |
|
401 return operationsCount.get(); |
|
402 } |
|
403 @Override |
|
404 public long getOutstandingHttpOperations() { |
|
405 return httpCount.get(); |
|
406 } |
|
407 @Override |
|
408 public long getOutstandingHttp2Streams() { return http2Count.get(); } |
|
409 @Override |
|
410 public long getOutstandingWebSocketOperations() { |
|
411 return websocketCount.get(); |
|
412 } |
|
413 @Override |
|
414 public boolean isFacadeReferenced() { |
|
415 return reference.get() != null; |
|
416 } |
|
417 @Override |
|
418 public String getName() { |
|
419 return name; |
|
420 } |
|
421 } |
|
422 |
|
423 public Tracker getOperationsTracker() { |
|
424 return new HttpClientTracker(pendingHttpRequestCount, |
|
425 pendingHttp2StreamCount, |
|
426 pendingWebSocketCount, |
|
427 pendingOperationCount, |
|
428 facadeRef, |
|
429 dbgTag); |
|
430 } |
|
431 |
|
432 // Called by the SelectorManager thread to figure out whether it's time |
|
433 // to terminate. |
|
434 final boolean isReferenced() { |
|
435 HttpClient facade = facade(); |
|
436 return facade != null || referenceCount() > 0; |
|
437 } |
|
438 |
|
439 /** |
|
440 * Wait for activity on given exchange. |
|
441 * The following occurs in the SelectorManager thread. |
|
442 * |
|
443 * 1) add to selector |
|
444 * 2) If selector fires for this exchange then |
|
445 * call AsyncEvent.handle() |
|
446 * |
|
447 * If exchange needs to change interest ops, then call registerEvent() again. |
|
448 */ |
|
449 void registerEvent(AsyncEvent exchange) throws IOException { |
|
450 selmgr.register(exchange); |
|
451 } |
|
452 |
|
453 /** |
|
454 * Allows an AsyncEvent to modify its interestOps. |
|
455 * @param event The modified event. |
|
456 */ |
|
457 void eventUpdated(AsyncEvent event) throws ClosedChannelException { |
|
458 assert !(event instanceof AsyncTriggerEvent); |
|
459 selmgr.eventUpdated(event); |
|
460 } |
|
461 |
|
462 boolean isSelectorThread() { |
|
463 return Thread.currentThread() == selmgr; |
|
464 } |
|
465 |
|
466 Http2ClientImpl client2() { |
|
467 return client2; |
|
468 } |
|
469 |
|
470 private void debugCompleted(String tag, long startNanos, HttpRequest req) { |
|
471 if (debugelapsed.on()) { |
|
472 debugelapsed.log(tag + " elapsed " |
|
473 + (System.nanoTime() - startNanos)/1000_000L |
|
474 + " millis for " + req.method() |
|
475 + " to " + req.uri()); |
|
476 } |
|
477 } |
|
478 |
|
479 @Override |
|
480 public <T> HttpResponse<T> |
|
481 send(HttpRequest req, BodyHandler<T> responseHandler) |
|
482 throws IOException, InterruptedException |
|
483 { |
|
484 try { |
|
485 return sendAsync(req, responseHandler, null).get(); |
|
486 } catch (ExecutionException e) { |
|
487 Throwable t = e.getCause(); |
|
488 if (t instanceof Error) |
|
489 throw (Error)t; |
|
490 if (t instanceof RuntimeException) |
|
491 throw (RuntimeException)t; |
|
492 else if (t instanceof IOException) |
|
493 throw Utils.getIOException(t); |
|
494 else |
|
495 throw new InternalError("Unexpected exception", t); |
|
496 } |
|
497 } |
|
498 |
|
499 @Override |
|
500 public <T> CompletableFuture<HttpResponse<T>> |
|
501 sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) |
|
502 { |
|
503 return sendAsync(userRequest, responseHandler, null); |
|
504 } |
|
505 |
|
506 |
|
507 @Override |
|
508 public <T> CompletableFuture<HttpResponse<T>> |
|
509 sendAsync(HttpRequest userRequest, |
|
510 BodyHandler<T> responseHandler, |
|
511 PushPromiseHandler<T> pushPromiseHandler) |
|
512 { |
|
513 Objects.requireNonNull(userRequest); |
|
514 Objects.requireNonNull(responseHandler); |
|
515 |
|
516 AccessControlContext acc = null; |
|
517 if (System.getSecurityManager() != null) |
|
518 acc = AccessController.getContext(); |
|
519 |
|
520 // Clone the, possibly untrusted, HttpRequest |
|
521 HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector); |
|
522 if (requestImpl.method().equals("CONNECT")) |
|
523 throw new IllegalArgumentException("Unsupported method CONNECT"); |
|
524 |
|
525 long start = DEBUGELAPSED ? System.nanoTime() : 0; |
|
526 reference(); |
|
527 try { |
|
528 if (debugelapsed.on()) |
|
529 debugelapsed.log("ClientImpl (async) send %s", userRequest); |
|
530 |
|
531 Executor executor = acc == null |
|
532 ? this.executor |
|
533 : new PrivilegedExecutor(this.executor, acc); |
|
534 |
|
535 MultiExchange<T> mex = new MultiExchange<>(userRequest, |
|
536 requestImpl, |
|
537 this, |
|
538 responseHandler, |
|
539 pushPromiseHandler, |
|
540 acc); |
|
541 CompletableFuture<HttpResponse<T>> res = |
|
542 mex.responseAsync().whenComplete((b,t) -> unreference()); |
|
543 if (DEBUGELAPSED) { |
|
544 res = res.whenComplete( |
|
545 (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); |
|
546 } |
|
547 |
|
548 // makes sure that any dependent actions happen in the executor |
|
549 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor); |
|
550 |
|
551 return res; |
|
552 } catch(Throwable t) { |
|
553 unreference(); |
|
554 debugCompleted("ClientImpl (async)", start, userRequest); |
|
555 throw t; |
|
556 } |
|
557 } |
|
558 |
|
559 // Main loop for this client's selector |
|
560 private final static class SelectorManager extends Thread { |
|
561 |
|
562 // For testing purposes we have an internal System property that |
|
563 // can control the frequency at which the selector manager will wake |
|
564 // up when there are no pending operations. |
|
565 // Increasing the frequency (shorter delays) might allow the selector |
|
566 // to observe that the facade is no longer referenced and might allow |
|
567 // the selector thread to terminate more timely - for when nothing is |
|
568 // ongoing it will only check for that condition every NODEADLINE ms. |
|
569 // To avoid misuse of the property, the delay that can be specified |
|
570 // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default |
|
571 // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms |
|
572 // The property is -Djdk.internal.httpclient.selectorTimeout=<millis> |
|
573 private static final int MIN_NODEADLINE = 1000; // ms |
|
574 private static final int MAX_NODEADLINE = 1000 * 1200; // ms |
|
575 private static final int DEF_NODEADLINE = 3000; // ms |
|
576 private static final long NODEADLINE; // default is DEF_NODEADLINE ms |
|
577 static { |
|
578 // ensure NODEADLINE is initialized with some valid value. |
|
579 long deadline = Utils.getIntegerProperty( |
|
580 "jdk.internal.httpclient.selectorTimeout", |
|
581 DEF_NODEADLINE); // millis |
|
582 if (deadline <= 0) deadline = DEF_NODEADLINE; |
|
583 deadline = Math.max(deadline, MIN_NODEADLINE); |
|
584 NODEADLINE = Math.min(deadline, MAX_NODEADLINE); |
|
585 } |
|
586 |
|
587 private final Selector selector; |
|
588 private volatile boolean closed; |
|
589 private final List<AsyncEvent> registrations; |
|
590 private final List<AsyncTriggerEvent> deregistrations; |
|
591 private final Logger debug; |
|
592 private final Logger debugtimeout; |
|
593 HttpClientImpl owner; |
|
594 ConnectionPool pool; |
|
595 |
|
596 SelectorManager(HttpClientImpl ref) throws IOException { |
|
597 super(null, null, |
|
598 "HttpClient-" + ref.id + "-SelectorManager", |
|
599 0, false); |
|
600 owner = ref; |
|
601 debug = ref.debug; |
|
602 debugtimeout = ref.debugtimeout; |
|
603 pool = ref.connectionPool(); |
|
604 registrations = new ArrayList<>(); |
|
605 deregistrations = new ArrayList<>(); |
|
606 selector = Selector.open(); |
|
607 } |
|
608 |
|
609 void eventUpdated(AsyncEvent e) throws ClosedChannelException { |
|
610 if (Thread.currentThread() == this) { |
|
611 SelectionKey key = e.channel().keyFor(selector); |
|
612 if (key != null && key.isValid()) { |
|
613 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
|
614 sa.register(e); |
|
615 } else if (e.interestOps() != 0){ |
|
616 // We don't care about paused events. |
|
617 // These are actually handled by |
|
618 // SelectorAttachment::resetInterestOps later on. |
|
619 // But if we reach here when trying to resume an |
|
620 // event then it's better to fail fast. |
|
621 if (debug.on()) debug.log("No key for channel"); |
|
622 e.abort(new IOException("No key for channel")); |
|
623 } |
|
624 } else { |
|
625 register(e); |
|
626 } |
|
627 } |
|
628 |
|
629 // This returns immediately. So caller not allowed to send/receive |
|
630 // on connection. |
|
631 synchronized void register(AsyncEvent e) { |
|
632 registrations.add(e); |
|
633 selector.wakeup(); |
|
634 } |
|
635 |
|
636 synchronized void cancel(SocketChannel e) { |
|
637 SelectionKey key = e.keyFor(selector); |
|
638 if (key != null) { |
|
639 key.cancel(); |
|
640 } |
|
641 selector.wakeup(); |
|
642 } |
|
643 |
|
644 void wakeupSelector() { |
|
645 selector.wakeup(); |
|
646 } |
|
647 |
|
648 synchronized void shutdown() { |
|
649 if (debug.on()) debug.log("SelectorManager shutting down"); |
|
650 closed = true; |
|
651 try { |
|
652 selector.close(); |
|
653 } catch (IOException ignored) { |
|
654 } finally { |
|
655 owner.stop(); |
|
656 } |
|
657 } |
|
658 |
|
659 @Override |
|
660 public void run() { |
|
661 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); |
|
662 List<AsyncEvent> readyList = new ArrayList<>(); |
|
663 List<Runnable> resetList = new ArrayList<>(); |
|
664 try { |
|
665 while (!Thread.currentThread().isInterrupted()) { |
|
666 synchronized (this) { |
|
667 assert errorList.isEmpty(); |
|
668 assert readyList.isEmpty(); |
|
669 assert resetList.isEmpty(); |
|
670 for (AsyncTriggerEvent event : deregistrations) { |
|
671 event.handle(); |
|
672 } |
|
673 deregistrations.clear(); |
|
674 for (AsyncEvent event : registrations) { |
|
675 if (event instanceof AsyncTriggerEvent) { |
|
676 readyList.add(event); |
|
677 continue; |
|
678 } |
|
679 SelectableChannel chan = event.channel(); |
|
680 SelectionKey key = null; |
|
681 try { |
|
682 key = chan.keyFor(selector); |
|
683 SelectorAttachment sa; |
|
684 if (key == null || !key.isValid()) { |
|
685 if (key != null) { |
|
686 // key is canceled. |
|
687 // invoke selectNow() to purge it |
|
688 // before registering the new event. |
|
689 selector.selectNow(); |
|
690 } |
|
691 sa = new SelectorAttachment(chan, selector); |
|
692 } else { |
|
693 sa = (SelectorAttachment) key.attachment(); |
|
694 } |
|
695 // may throw IOE if channel closed: that's OK |
|
696 sa.register(event); |
|
697 if (!chan.isOpen()) { |
|
698 throw new IOException("Channel closed"); |
|
699 } |
|
700 } catch (IOException e) { |
|
701 Log.logTrace("HttpClientImpl: " + e); |
|
702 if (debug.on()) |
|
703 debug.log("Got " + e.getClass().getName() |
|
704 + " while handling registration events"); |
|
705 chan.close(); |
|
706 // let the event abort deal with it |
|
707 errorList.add(new Pair<>(event, e)); |
|
708 if (key != null) { |
|
709 key.cancel(); |
|
710 selector.selectNow(); |
|
711 } |
|
712 } |
|
713 } |
|
714 registrations.clear(); |
|
715 selector.selectedKeys().clear(); |
|
716 } |
|
717 |
|
718 for (AsyncEvent event : readyList) { |
|
719 assert event instanceof AsyncTriggerEvent; |
|
720 event.handle(); |
|
721 } |
|
722 readyList.clear(); |
|
723 |
|
724 for (Pair<AsyncEvent,IOException> error : errorList) { |
|
725 // an IOException was raised and the channel closed. |
|
726 handleEvent(error.first, error.second); |
|
727 } |
|
728 errorList.clear(); |
|
729 |
|
730 // Check whether client is still alive, and if not, |
|
731 // gracefully stop this thread |
|
732 if (!owner.isReferenced()) { |
|
733 Log.logTrace("HttpClient no longer referenced. Exiting..."); |
|
734 return; |
|
735 } |
|
736 |
|
737 // Timeouts will have milliseconds granularity. It is important |
|
738 // to handle them in a timely fashion. |
|
739 long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline(); |
|
740 if (debugtimeout.on()) |
|
741 debugtimeout.log("next timeout: %d", nextTimeout); |
|
742 |
|
743 // Keep-alive have seconds granularity. It's not really an |
|
744 // issue if we keep connections linger a bit more in the keep |
|
745 // alive cache. |
|
746 long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline(); |
|
747 if (debugtimeout.on()) |
|
748 debugtimeout.log("next expired: %d", nextExpiry); |
|
749 |
|
750 assert nextTimeout >= 0; |
|
751 assert nextExpiry >= 0; |
|
752 |
|
753 // Don't wait for ever as it might prevent the thread to |
|
754 // stop gracefully. millis will be 0 if no deadline was found. |
|
755 if (nextTimeout <= 0) nextTimeout = NODEADLINE; |
|
756 |
|
757 // Clip nextExpiry at NODEADLINE limit. The default |
|
758 // keep alive is 1200 seconds (half an hour) - we don't |
|
759 // want to wait that long. |
|
760 if (nextExpiry <= 0) nextExpiry = NODEADLINE; |
|
761 else nextExpiry = Math.min(NODEADLINE, nextExpiry); |
|
762 |
|
763 // takes the least of the two. |
|
764 long millis = Math.min(nextExpiry, nextTimeout); |
|
765 |
|
766 if (debugtimeout.on()) |
|
767 debugtimeout.log("Next deadline is %d", |
|
768 (millis == 0 ? NODEADLINE : millis)); |
|
769 //debugPrint(selector); |
|
770 int n = selector.select(millis == 0 ? NODEADLINE : millis); |
|
771 if (n == 0) { |
|
772 // Check whether client is still alive, and if not, |
|
773 // gracefully stop this thread |
|
774 if (!owner.isReferenced()) { |
|
775 Log.logTrace("HttpClient no longer referenced. Exiting..."); |
|
776 return; |
|
777 } |
|
778 owner.purgeTimeoutsAndReturnNextDeadline(); |
|
779 continue; |
|
780 } |
|
781 |
|
782 Set<SelectionKey> keys = selector.selectedKeys(); |
|
783 assert errorList.isEmpty(); |
|
784 |
|
785 for (SelectionKey key : keys) { |
|
786 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
|
787 if (!key.isValid()) { |
|
788 IOException ex = sa.chan.isOpen() |
|
789 ? new IOException("Invalid key") |
|
790 : new ClosedChannelException(); |
|
791 sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex))); |
|
792 sa.pending.clear(); |
|
793 continue; |
|
794 } |
|
795 |
|
796 int eventsOccurred; |
|
797 try { |
|
798 eventsOccurred = key.readyOps(); |
|
799 } catch (CancelledKeyException ex) { |
|
800 IOException io = Utils.getIOException(ex); |
|
801 sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); |
|
802 sa.pending.clear(); |
|
803 continue; |
|
804 } |
|
805 sa.events(eventsOccurred).forEach(readyList::add); |
|
806 resetList.add(() -> sa.resetInterestOps(eventsOccurred)); |
|
807 } |
|
808 |
|
809 selector.selectNow(); // complete cancellation |
|
810 selector.selectedKeys().clear(); |
|
811 |
|
812 // handle selected events |
|
813 readyList.forEach((e) -> handleEvent(e, null)); |
|
814 readyList.clear(); |
|
815 |
|
816 // handle errors (closed channels etc...) |
|
817 errorList.forEach((p) -> handleEvent(p.first, p.second)); |
|
818 errorList.clear(); |
|
819 |
|
820 // reset interest ops for selected channels |
|
821 resetList.forEach(r -> r.run()); |
|
822 resetList.clear(); |
|
823 |
|
824 } |
|
825 } catch (Throwable e) { |
|
826 //e.printStackTrace(); |
|
827 if (!closed) { |
|
828 // This terminates thread. So, better just print stack trace |
|
829 String err = Utils.stackTrace(e); |
|
830 Log.logError("HttpClientImpl: fatal error: " + err); |
|
831 } |
|
832 if (debug.on()) debug.log("shutting down", e); |
|
833 if (Utils.ASSERTIONSENABLED && !debug.on()) { |
|
834 e.printStackTrace(System.err); // always print the stack |
|
835 } |
|
836 } finally { |
|
837 shutdown(); |
|
838 } |
|
839 } |
|
840 |
|
841 // void debugPrint(Selector selector) { |
|
842 // System.err.println("Selector: debugprint start"); |
|
843 // Set<SelectionKey> keys = selector.keys(); |
|
844 // for (SelectionKey key : keys) { |
|
845 // SelectableChannel c = key.channel(); |
|
846 // int ops = key.interestOps(); |
|
847 // System.err.printf("selector chan:%s ops:%d\n", c, ops); |
|
848 // } |
|
849 // System.err.println("Selector: debugprint end"); |
|
850 // } |
|
851 |
|
852 /** Handles the given event. The given ioe may be null. */ |
|
853 void handleEvent(AsyncEvent event, IOException ioe) { |
|
854 if (closed || ioe != null) { |
|
855 event.abort(ioe); |
|
856 } else { |
|
857 event.handle(); |
|
858 } |
|
859 } |
|
860 } |
|
861 |
|
862 final String debugInterestOps(SelectableChannel channel) { |
|
863 try { |
|
864 SelectionKey key = channel.keyFor(selmgr.selector); |
|
865 if (key == null) return "channel not registered with selector"; |
|
866 String keyInterestOps = key.isValid() |
|
867 ? "key.interestOps=" + key.interestOps() : "invalid key"; |
|
868 return String.format("channel registered with selector, %s, sa.interestOps=%s", |
|
869 keyInterestOps, |
|
870 ((SelectorAttachment)key.attachment()).interestOps); |
|
871 } catch (Throwable t) { |
|
872 return String.valueOf(t); |
|
873 } |
|
874 } |
|
875 |
|
876 /** |
|
877 * Tracks multiple user level registrations associated with one NIO |
|
878 * registration (SelectionKey). In this implementation, registrations |
|
879 * are one-off and when an event is posted the registration is cancelled |
|
880 * until explicitly registered again. |
|
881 * |
|
882 * <p> No external synchronization required as this class is only used |
|
883 * by the SelectorManager thread. One of these objects required per |
|
884 * connection. |
|
885 */ |
|
886 private static class SelectorAttachment { |
|
887 private final SelectableChannel chan; |
|
888 private final Selector selector; |
|
889 private final Set<AsyncEvent> pending; |
|
890 private final static Logger debug = |
|
891 Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG); |
|
892 private int interestOps; |
|
893 |
|
894 SelectorAttachment(SelectableChannel chan, Selector selector) { |
|
895 this.pending = new HashSet<>(); |
|
896 this.chan = chan; |
|
897 this.selector = selector; |
|
898 } |
|
899 |
|
900 void register(AsyncEvent e) throws ClosedChannelException { |
|
901 int newOps = e.interestOps(); |
|
902 // re register interest if we are not already interested |
|
903 // in the event. If the event is paused, then the pause will |
|
904 // be taken into account later when resetInterestOps is called. |
|
905 boolean reRegister = (interestOps & newOps) != newOps; |
|
906 interestOps |= newOps; |
|
907 pending.add(e); |
|
908 if (debug.on()) |
|
909 debug.log("Registering %s for %d (%s)", e, newOps, reRegister); |
|
910 if (reRegister) { |
|
911 // first time registration happens here also |
|
912 try { |
|
913 chan.register(selector, interestOps, this); |
|
914 } catch (Throwable x) { |
|
915 abortPending(x); |
|
916 } |
|
917 } else if (!chan.isOpen()) { |
|
918 abortPending(new ClosedChannelException()); |
|
919 } |
|
920 } |
|
921 |
|
922 /** |
|
923 * Returns a Stream<AsyncEvents> containing only events that are |
|
924 * registered with the given {@code interestOps}. |
|
925 */ |
|
926 Stream<AsyncEvent> events(int interestOps) { |
|
927 return pending.stream() |
|
928 .filter(ev -> (ev.interestOps() & interestOps) != 0); |
|
929 } |
|
930 |
|
931 /** |
|
932 * Removes any events with the given {@code interestOps}, and if no |
|
933 * events remaining, cancels the associated SelectionKey. |
|
934 */ |
|
935 void resetInterestOps(int interestOps) { |
|
936 int newOps = 0; |
|
937 |
|
938 Iterator<AsyncEvent> itr = pending.iterator(); |
|
939 while (itr.hasNext()) { |
|
940 AsyncEvent event = itr.next(); |
|
941 int evops = event.interestOps(); |
|
942 if (event.repeating()) { |
|
943 newOps |= evops; |
|
944 continue; |
|
945 } |
|
946 if ((evops & interestOps) != 0) { |
|
947 itr.remove(); |
|
948 } else { |
|
949 newOps |= evops; |
|
950 } |
|
951 } |
|
952 |
|
953 this.interestOps = newOps; |
|
954 SelectionKey key = chan.keyFor(selector); |
|
955 if (newOps == 0 && key != null && pending.isEmpty()) { |
|
956 key.cancel(); |
|
957 } else { |
|
958 try { |
|
959 if (key == null || !key.isValid()) { |
|
960 throw new CancelledKeyException(); |
|
961 } |
|
962 key.interestOps(newOps); |
|
963 // double check after |
|
964 if (!chan.isOpen()) { |
|
965 abortPending(new ClosedChannelException()); |
|
966 return; |
|
967 } |
|
968 assert key.interestOps() == newOps; |
|
969 } catch (CancelledKeyException x) { |
|
970 // channel may have been closed |
|
971 if (debug.on()) debug.log("key cancelled for " + chan); |
|
972 abortPending(x); |
|
973 } |
|
974 } |
|
975 } |
|
976 |
|
977 void abortPending(Throwable x) { |
|
978 if (!pending.isEmpty()) { |
|
979 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]); |
|
980 pending.clear(); |
|
981 IOException io = Utils.getIOException(x); |
|
982 for (AsyncEvent event : evts) { |
|
983 event.abort(io); |
|
984 } |
|
985 } |
|
986 } |
|
987 } |
|
988 |
|
989 /*package-private*/ SSLContext theSSLContext() { |
|
990 return sslContext; |
|
991 } |
|
992 |
|
993 @Override |
|
994 public SSLContext sslContext() { |
|
995 return sslContext; |
|
996 } |
|
997 |
|
998 @Override |
|
999 public SSLParameters sslParameters() { |
|
1000 return Utils.copySSLParameters(sslParams); |
|
1001 } |
|
1002 |
|
1003 @Override |
|
1004 public Optional<Authenticator> authenticator() { |
|
1005 return Optional.ofNullable(authenticator); |
|
1006 } |
|
1007 |
|
1008 /*package-private*/ final Executor theExecutor() { |
|
1009 return executor; |
|
1010 } |
|
1011 |
|
1012 @Override |
|
1013 public final Optional<Executor> executor() { |
|
1014 return isDefaultExecutor ? Optional.empty() : Optional.of(executor); |
|
1015 } |
|
1016 |
|
1017 ConnectionPool connectionPool() { |
|
1018 return connections; |
|
1019 } |
|
1020 |
|
1021 @Override |
|
1022 public Redirect followRedirects() { |
|
1023 return followRedirects; |
|
1024 } |
|
1025 |
|
1026 |
|
1027 @Override |
|
1028 public Optional<CookieHandler> cookieHandler() { |
|
1029 return Optional.ofNullable(cookieHandler); |
|
1030 } |
|
1031 |
|
1032 @Override |
|
1033 public Optional<ProxySelector> proxy() { |
|
1034 return this.userProxySelector; |
|
1035 } |
|
1036 |
|
1037 // Return the effective proxy that this client uses. |
|
1038 ProxySelector proxySelector() { |
|
1039 return proxySelector; |
|
1040 } |
|
1041 |
|
1042 @Override |
|
1043 public WebSocket.Builder newWebSocketBuilder() { |
|
1044 // Make sure to pass the HttpClientFacade to the WebSocket builder. |
|
1045 // This will ensure that the facade is not released before the |
|
1046 // WebSocket has been created, at which point the pendingOperationCount |
|
1047 // will have been incremented by the RawChannelTube. |
|
1048 // See RawChannelTube. |
|
1049 return new BuilderImpl(this.facade(), proxySelector); |
|
1050 } |
|
1051 |
|
1052 @Override |
|
1053 public Version version() { |
|
1054 return version; |
|
1055 } |
|
1056 |
|
1057 String dbgString() { |
|
1058 return dbgTag; |
|
1059 } |
|
1060 |
|
1061 @Override |
|
1062 public String toString() { |
|
1063 // Used by tests to get the client's id and compute the |
|
1064 // name of the SelectorManager thread. |
|
1065 return super.toString() + ("(" + id + ")"); |
|
1066 } |
|
1067 |
|
1068 private void initFilters() { |
|
1069 addFilter(AuthenticationFilter.class); |
|
1070 addFilter(RedirectFilter.class); |
|
1071 if (this.cookieHandler != null) { |
|
1072 addFilter(CookieFilter.class); |
|
1073 } |
|
1074 } |
|
1075 |
|
1076 private void addFilter(Class<? extends HeaderFilter> f) { |
|
1077 filters.addFilter(f); |
|
1078 } |
|
1079 |
|
1080 final LinkedList<HeaderFilter> filterChain() { |
|
1081 return filters.getFilterChain(); |
|
1082 } |
|
1083 |
|
1084 // Timer controls. |
|
1085 // Timers are implemented through timed Selector.select() calls. |
|
1086 |
|
1087 synchronized void registerTimer(TimeoutEvent event) { |
|
1088 Log.logTrace("Registering timer {0}", event); |
|
1089 timeouts.add(event); |
|
1090 selmgr.wakeupSelector(); |
|
1091 } |
|
1092 |
|
1093 synchronized void cancelTimer(TimeoutEvent event) { |
|
1094 Log.logTrace("Canceling timer {0}", event); |
|
1095 timeouts.remove(event); |
|
1096 } |
|
1097 |
|
1098 /** |
|
1099 * Purges ( handles ) timer events that have passed their deadline, and |
|
1100 * returns the amount of time, in milliseconds, until the next earliest |
|
1101 * event. A return value of 0 means that there are no events. |
|
1102 */ |
|
1103 private long purgeTimeoutsAndReturnNextDeadline() { |
|
1104 long diff = 0L; |
|
1105 List<TimeoutEvent> toHandle = null; |
|
1106 int remaining = 0; |
|
1107 // enter critical section to retrieve the timeout event to handle |
|
1108 synchronized(this) { |
|
1109 if (timeouts.isEmpty()) return 0L; |
|
1110 |
|
1111 Instant now = Instant.now(); |
|
1112 Iterator<TimeoutEvent> itr = timeouts.iterator(); |
|
1113 while (itr.hasNext()) { |
|
1114 TimeoutEvent event = itr.next(); |
|
1115 diff = now.until(event.deadline(), ChronoUnit.MILLIS); |
|
1116 if (diff <= 0) { |
|
1117 itr.remove(); |
|
1118 toHandle = (toHandle == null) ? new ArrayList<>() : toHandle; |
|
1119 toHandle.add(event); |
|
1120 } else { |
|
1121 break; |
|
1122 } |
|
1123 } |
|
1124 remaining = timeouts.size(); |
|
1125 } |
|
1126 |
|
1127 // can be useful for debugging |
|
1128 if (toHandle != null && Log.trace()) { |
|
1129 Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " |
|
1130 + toHandle.size() + " events, " |
|
1131 + "remaining " + remaining |
|
1132 + ", next deadline: " + (diff < 0 ? 0L : diff)); |
|
1133 } |
|
1134 |
|
1135 // handle timeout events out of critical section |
|
1136 if (toHandle != null) { |
|
1137 Throwable failed = null; |
|
1138 for (TimeoutEvent event : toHandle) { |
|
1139 try { |
|
1140 Log.logTrace("Firing timer {0}", event); |
|
1141 event.handle(); |
|
1142 } catch (Error | RuntimeException e) { |
|
1143 // Not expected. Handle remaining events then throw... |
|
1144 // If e is an OOME or SOE it might simply trigger a new |
|
1145 // error from here - but in this case there's not much we |
|
1146 // could do anyway. Just let it flow... |
|
1147 if (failed == null) failed = e; |
|
1148 else failed.addSuppressed(e); |
|
1149 Log.logTrace("Failed to handle event {0}: {1}", event, e); |
|
1150 } |
|
1151 } |
|
1152 if (failed instanceof Error) throw (Error) failed; |
|
1153 if (failed instanceof RuntimeException) throw (RuntimeException) failed; |
|
1154 } |
|
1155 |
|
1156 // return time to wait until next event. 0L if there's no more events. |
|
1157 return diff < 0 ? 0L : diff; |
|
1158 } |
|
1159 |
|
1160 // used for the connection window |
|
1161 int getReceiveBufferSize() { |
|
1162 return Utils.getIntegerNetProperty( |
|
1163 "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024 |
|
1164 ); |
|
1165 } |
|
1166 } |