26 package jdk.internal.net.http; |
26 package jdk.internal.net.http; |
27 |
27 |
28 import javax.net.ssl.SSLContext; |
28 import javax.net.ssl.SSLContext; |
29 import javax.net.ssl.SSLParameters; |
29 import javax.net.ssl.SSLParameters; |
30 import java.io.IOException; |
30 import java.io.IOException; |
|
31 import java.io.UncheckedIOException; |
31 import java.lang.ref.Reference; |
32 import java.lang.ref.Reference; |
32 import java.lang.ref.WeakReference; |
33 import java.lang.ref.WeakReference; |
33 import java.net.Authenticator; |
34 import java.net.Authenticator; |
|
35 import java.net.ConnectException; |
34 import java.net.CookieHandler; |
36 import java.net.CookieHandler; |
35 import java.net.ProxySelector; |
37 import java.net.ProxySelector; |
|
38 import java.net.http.HttpTimeoutException; |
36 import java.nio.ByteBuffer; |
39 import java.nio.ByteBuffer; |
37 import java.nio.channels.CancelledKeyException; |
40 import java.nio.channels.CancelledKeyException; |
38 import java.nio.channels.ClosedChannelException; |
41 import java.nio.channels.ClosedChannelException; |
39 import java.nio.channels.SelectableChannel; |
42 import java.nio.channels.SelectableChannel; |
40 import java.nio.channels.SelectionKey; |
43 import java.nio.channels.SelectionKey; |
54 import java.util.Objects; |
57 import java.util.Objects; |
55 import java.util.Optional; |
58 import java.util.Optional; |
56 import java.util.Set; |
59 import java.util.Set; |
57 import java.util.TreeSet; |
60 import java.util.TreeSet; |
58 import java.util.concurrent.CompletableFuture; |
61 import java.util.concurrent.CompletableFuture; |
59 import java.util.concurrent.ConcurrentLinkedQueue; |
62 import java.util.concurrent.CompletionException; |
60 import java.util.concurrent.ExecutionException; |
63 import java.util.concurrent.ExecutionException; |
61 import java.util.concurrent.Executor; |
64 import java.util.concurrent.Executor; |
62 import java.util.concurrent.Executors; |
65 import java.util.concurrent.Executors; |
63 import java.util.concurrent.ThreadFactory; |
66 import java.util.concurrent.ThreadFactory; |
64 import java.util.concurrent.atomic.AtomicInteger; |
67 import java.util.concurrent.atomic.AtomicInteger; |
65 import java.util.concurrent.atomic.AtomicLong; |
68 import java.util.concurrent.atomic.AtomicLong; |
|
69 import java.util.function.BooleanSupplier; |
66 import java.util.stream.Stream; |
70 import java.util.stream.Stream; |
67 import java.net.http.HttpClient; |
71 import java.net.http.HttpClient; |
68 import java.net.http.HttpRequest; |
72 import java.net.http.HttpRequest; |
69 import java.net.http.HttpResponse; |
73 import java.net.http.HttpResponse; |
70 import java.net.http.HttpResponse.BodyHandler; |
74 import java.net.http.HttpResponse.BodyHandler; |
119 t.setDaemon(true); |
123 t.setDaemon(true); |
120 return t; |
124 return t; |
121 } |
125 } |
122 } |
126 } |
123 |
127 |
|
128 /** |
|
129 * A DelegatingExecutor is an executor that delegates tasks to |
|
130 * a wrapped executor when it detects that the current thread |
|
131 * is the SelectorManager thread. If the current thread is not |
|
132 * the selector manager thread the given task is executed inline. |
|
133 */ |
|
134 final static class DelegatingExecutor implements Executor { |
|
135 private final BooleanSupplier isInSelectorThread; |
|
136 private final Executor delegate; |
|
137 DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) { |
|
138 this.isInSelectorThread = isInSelectorThread; |
|
139 this.delegate = delegate; |
|
140 } |
|
141 |
|
142 Executor delegate() { |
|
143 return delegate; |
|
144 } |
|
145 |
|
146 @Override |
|
147 public void execute(Runnable command) { |
|
148 if (isInSelectorThread.getAsBoolean()) { |
|
149 delegate.execute(command); |
|
150 } else { |
|
151 command.run(); |
|
152 } |
|
153 } |
|
154 } |
|
155 |
124 private final CookieHandler cookieHandler; |
156 private final CookieHandler cookieHandler; |
125 private final Redirect followRedirects; |
157 private final Redirect followRedirects; |
126 private final Optional<ProxySelector> userProxySelector; |
158 private final Optional<ProxySelector> userProxySelector; |
127 private final ProxySelector proxySelector; |
159 private final ProxySelector proxySelector; |
128 private final Authenticator authenticator; |
160 private final Authenticator authenticator; |
129 private final Version version; |
161 private final Version version; |
130 private final ConnectionPool connections; |
162 private final ConnectionPool connections; |
131 private final Executor executor; |
163 private final DelegatingExecutor delegatingExecutor; |
132 private final boolean isDefaultExecutor; |
164 private final boolean isDefaultExecutor; |
133 // Security parameters |
165 // Security parameters |
134 private final SSLContext sslContext; |
166 private final SSLContext sslContext; |
135 private final SSLParameters sslParams; |
167 private final SSLParameters sslParams; |
136 private final SelectorManager selmgr; |
168 private final SelectorManager selmgr; |
238 Executor ex = builder.executor; |
270 Executor ex = builder.executor; |
239 if (ex == null) { |
271 if (ex == null) { |
240 ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); |
272 ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); |
241 isDefaultExecutor = true; |
273 isDefaultExecutor = true; |
242 } else { |
274 } else { |
243 ex = builder.executor; |
|
244 isDefaultExecutor = false; |
275 isDefaultExecutor = false; |
245 } |
276 } |
|
277 delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex); |
246 facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); |
278 facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); |
247 client2 = new Http2ClientImpl(this); |
279 client2 = new Http2ClientImpl(this); |
248 executor = ex; |
|
249 cookieHandler = builder.cookieHandler; |
280 cookieHandler = builder.cookieHandler; |
250 followRedirects = builder.followRedirects == null ? |
281 followRedirects = builder.followRedirects == null ? |
251 Redirect.NEVER : builder.followRedirects; |
282 Redirect.NEVER : builder.followRedirects; |
252 this.userProxySelector = Optional.ofNullable(builder.proxy); |
283 this.userProxySelector = Optional.ofNullable(builder.proxy); |
253 this.proxySelector = userProxySelector |
284 this.proxySelector = userProxySelector |
487 @Override |
518 @Override |
488 public <T> HttpResponse<T> |
519 public <T> HttpResponse<T> |
489 send(HttpRequest req, BodyHandler<T> responseHandler) |
520 send(HttpRequest req, BodyHandler<T> responseHandler) |
490 throws IOException, InterruptedException |
521 throws IOException, InterruptedException |
491 { |
522 { |
|
523 CompletableFuture<HttpResponse<T>> cf = null; |
492 try { |
524 try { |
493 return sendAsync(req, responseHandler, null).get(); |
525 cf = sendAsync(req, responseHandler, null, null); |
|
526 return cf.get(); |
|
527 } catch (InterruptedException ie) { |
|
528 if (cf != null ) |
|
529 cf.cancel(true); |
|
530 throw ie; |
494 } catch (ExecutionException e) { |
531 } catch (ExecutionException e) { |
495 Throwable t = e.getCause(); |
532 final Throwable throwable = e.getCause(); |
496 if (t instanceof Error) |
533 final String msg = throwable.getMessage(); |
497 throw (Error)t; |
534 |
498 if (t instanceof RuntimeException) |
535 if (throwable instanceof IllegalArgumentException) { |
499 throw (RuntimeException)t; |
536 throw new IllegalArgumentException(msg, throwable); |
500 else if (t instanceof IOException) |
537 } else if (throwable instanceof SecurityException) { |
501 throw Utils.getIOException(t); |
538 throw new SecurityException(msg, throwable); |
502 else |
539 } else if (throwable instanceof HttpTimeoutException) { |
503 throw new InternalError("Unexpected exception", t); |
540 throw new HttpTimeoutException(msg); |
504 } |
541 } else if (throwable instanceof ConnectException) { |
505 } |
542 ConnectException ce = new ConnectException(msg); |
|
543 ce.initCause(throwable); |
|
544 throw ce; |
|
545 } else if (throwable instanceof IOException) { |
|
546 throw new IOException(msg, throwable); |
|
547 } else { |
|
548 throw new IOException(msg, throwable); |
|
549 } |
|
550 } |
|
551 } |
|
552 |
|
553 private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor(); |
506 |
554 |
507 @Override |
555 @Override |
508 public <T> CompletableFuture<HttpResponse<T>> |
556 public <T> CompletableFuture<HttpResponse<T>> |
509 sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) |
557 sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) |
510 { |
558 { |
511 return sendAsync(userRequest, responseHandler, null); |
559 return sendAsync(userRequest, responseHandler, null); |
512 } |
560 } |
513 |
561 |
514 |
|
515 @Override |
562 @Override |
516 public <T> CompletableFuture<HttpResponse<T>> |
563 public <T> CompletableFuture<HttpResponse<T>> |
517 sendAsync(HttpRequest userRequest, |
564 sendAsync(HttpRequest userRequest, |
518 BodyHandler<T> responseHandler, |
565 BodyHandler<T> responseHandler, |
519 PushPromiseHandler<T> pushPromiseHandler) |
566 PushPromiseHandler<T> pushPromiseHandler) { |
520 { |
567 return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate); |
|
568 } |
|
569 |
|
570 private <T> CompletableFuture<HttpResponse<T>> |
|
571 sendAsync(HttpRequest userRequest, |
|
572 BodyHandler<T> responseHandler, |
|
573 PushPromiseHandler<T> pushPromiseHandler, |
|
574 Executor exchangeExecutor) { |
|
575 |
521 Objects.requireNonNull(userRequest); |
576 Objects.requireNonNull(userRequest); |
522 Objects.requireNonNull(responseHandler); |
577 Objects.requireNonNull(responseHandler); |
523 |
578 |
524 AccessControlContext acc = null; |
579 AccessControlContext acc = null; |
525 if (System.getSecurityManager() != null) |
580 if (System.getSecurityManager() != null) |
534 reference(); |
589 reference(); |
535 try { |
590 try { |
536 if (debugelapsed.on()) |
591 if (debugelapsed.on()) |
537 debugelapsed.log("ClientImpl (async) send %s", userRequest); |
592 debugelapsed.log("ClientImpl (async) send %s", userRequest); |
538 |
593 |
539 Executor executor = acc == null |
594 // When using sendAsync(...) we explicitly pass the |
540 ? this.executor |
595 // executor's delegate as exchange executor to force |
541 : new PrivilegedExecutor(this.executor, acc); |
596 // asynchronous scheduling of the exchange. |
|
597 // When using send(...) we don't specify any executor |
|
598 // and default to using the client's delegating executor |
|
599 // which only spawns asynchronous tasks if it detects |
|
600 // that the current thread is the selector manager |
|
601 // thread. This will cause everything to execute inline |
|
602 // until we need to schedule some event with the selector. |
|
603 Executor executor = exchangeExecutor == null |
|
604 ? this.delegatingExecutor : exchangeExecutor; |
542 |
605 |
543 MultiExchange<T> mex = new MultiExchange<>(userRequest, |
606 MultiExchange<T> mex = new MultiExchange<>(userRequest, |
544 requestImpl, |
607 requestImpl, |
545 this, |
608 this, |
546 responseHandler, |
609 responseHandler, |
547 pushPromiseHandler, |
610 pushPromiseHandler, |
548 acc); |
611 acc); |
549 CompletableFuture<HttpResponse<T>> res = |
612 CompletableFuture<HttpResponse<T>> res = |
550 mex.responseAsync().whenComplete((b,t) -> unreference()); |
613 mex.responseAsync(executor).whenComplete((b,t) -> unreference()); |
551 if (DEBUGELAPSED) { |
614 if (DEBUGELAPSED) { |
552 res = res.whenComplete( |
615 res = res.whenComplete( |
553 (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); |
616 (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); |
554 } |
617 } |
555 |
618 |
556 // makes sure that any dependent actions happen in the executor |
619 // makes sure that any dependent actions happen in the CF default |
557 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor); |
620 // executor. This is only needed for sendAsync(...), when |
558 |
621 // exchangeExecutor is non-null. |
|
622 if (exchangeExecutor != null) { |
|
623 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL); |
|
624 } |
559 return res; |
625 return res; |
560 } catch(Throwable t) { |
626 } catch(Throwable t) { |
561 unreference(); |
627 unreference(); |
562 debugCompleted("ClientImpl (async)", start, userRequest); |
628 debugCompleted("ClientImpl (async)", start, userRequest); |
563 throw t; |
629 throw t; |
668 public void run() { |
735 public void run() { |
669 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); |
736 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); |
670 List<AsyncEvent> readyList = new ArrayList<>(); |
737 List<AsyncEvent> readyList = new ArrayList<>(); |
671 List<Runnable> resetList = new ArrayList<>(); |
738 List<Runnable> resetList = new ArrayList<>(); |
672 try { |
739 try { |
|
740 if (Log.channel()) Log.logChannel(getName() + ": starting"); |
673 while (!Thread.currentThread().isInterrupted()) { |
741 while (!Thread.currentThread().isInterrupted()) { |
674 synchronized (this) { |
742 synchronized (this) { |
675 assert errorList.isEmpty(); |
743 assert errorList.isEmpty(); |
676 assert readyList.isEmpty(); |
744 assert readyList.isEmpty(); |
677 assert resetList.isEmpty(); |
745 assert resetList.isEmpty(); |
736 errorList.clear(); |
804 errorList.clear(); |
737 |
805 |
738 // Check whether client is still alive, and if not, |
806 // Check whether client is still alive, and if not, |
739 // gracefully stop this thread |
807 // gracefully stop this thread |
740 if (!owner.isReferenced()) { |
808 if (!owner.isReferenced()) { |
741 Log.logTrace("HttpClient no longer referenced. Exiting..."); |
809 Log.logTrace("{0}: {1}", |
|
810 getName(), |
|
811 "HttpClient no longer referenced. Exiting..."); |
742 return; |
812 return; |
743 } |
813 } |
744 |
814 |
745 // Timeouts will have milliseconds granularity. It is important |
815 // Timeouts will have milliseconds granularity. It is important |
746 // to handle them in a timely fashion. |
816 // to handle them in a timely fashion. |
778 int n = selector.select(millis == 0 ? NODEADLINE : millis); |
848 int n = selector.select(millis == 0 ? NODEADLINE : millis); |
779 if (n == 0) { |
849 if (n == 0) { |
780 // Check whether client is still alive, and if not, |
850 // Check whether client is still alive, and if not, |
781 // gracefully stop this thread |
851 // gracefully stop this thread |
782 if (!owner.isReferenced()) { |
852 if (!owner.isReferenced()) { |
783 Log.logTrace("HttpClient no longer referenced. Exiting..."); |
853 Log.logTrace("{0}: {1}", |
|
854 getName(), |
|
855 "HttpClient no longer referenced. Exiting..."); |
784 return; |
856 return; |
785 } |
857 } |
786 owner.purgeTimeoutsAndReturnNextDeadline(); |
858 owner.purgeTimeoutsAndReturnNextDeadline(); |
787 continue; |
859 continue; |
788 } |
860 } |
829 resetList.forEach(r -> r.run()); |
901 resetList.forEach(r -> r.run()); |
830 resetList.clear(); |
902 resetList.clear(); |
831 |
903 |
832 } |
904 } |
833 } catch (Throwable e) { |
905 } catch (Throwable e) { |
834 //e.printStackTrace(); |
|
835 if (!closed) { |
906 if (!closed) { |
836 // This terminates thread. So, better just print stack trace |
907 // This terminates thread. So, better just print stack trace |
837 String err = Utils.stackTrace(e); |
908 String err = Utils.stackTrace(e); |
838 Log.logError("HttpClientImpl: fatal error: " + err); |
909 Log.logError("{0}: {1}: {2}", getName(), |
|
910 "HttpClientImpl shutting down due to fatal error", err); |
839 } |
911 } |
840 if (debug.on()) debug.log("shutting down", e); |
912 if (debug.on()) debug.log("shutting down", e); |
841 if (Utils.ASSERTIONSENABLED && !debug.on()) { |
913 if (Utils.ASSERTIONSENABLED && !debug.on()) { |
842 e.printStackTrace(System.err); // always print the stack |
914 e.printStackTrace(System.err); // always print the stack |
843 } |
915 } |
844 } finally { |
916 } finally { |
|
917 if (Log.channel()) Log.logChannel(getName() + ": stopping"); |
845 shutdown(); |
918 shutdown(); |
846 } |
919 } |
847 } |
920 } |
848 |
921 |
849 // void debugPrint(Selector selector) { |
922 // void debugPrint(Selector selector) { |