src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 50985 cd41f34e548c
child 56795 03ece2518428
child 56799 c274589ad63b
equal deleted inserted replaced
50678:818a23db260c 50681:4254bed3c09d
    26 package jdk.internal.net.http;
    26 package jdk.internal.net.http;
    27 
    27 
    28 import javax.net.ssl.SSLContext;
    28 import javax.net.ssl.SSLContext;
    29 import javax.net.ssl.SSLParameters;
    29 import javax.net.ssl.SSLParameters;
    30 import java.io.IOException;
    30 import java.io.IOException;
       
    31 import java.io.UncheckedIOException;
    31 import java.lang.ref.Reference;
    32 import java.lang.ref.Reference;
    32 import java.lang.ref.WeakReference;
    33 import java.lang.ref.WeakReference;
    33 import java.net.Authenticator;
    34 import java.net.Authenticator;
       
    35 import java.net.ConnectException;
    34 import java.net.CookieHandler;
    36 import java.net.CookieHandler;
    35 import java.net.ProxySelector;
    37 import java.net.ProxySelector;
       
    38 import java.net.http.HttpTimeoutException;
    36 import java.nio.ByteBuffer;
    39 import java.nio.ByteBuffer;
    37 import java.nio.channels.CancelledKeyException;
    40 import java.nio.channels.CancelledKeyException;
    38 import java.nio.channels.ClosedChannelException;
    41 import java.nio.channels.ClosedChannelException;
    39 import java.nio.channels.SelectableChannel;
    42 import java.nio.channels.SelectableChannel;
    40 import java.nio.channels.SelectionKey;
    43 import java.nio.channels.SelectionKey;
    54 import java.util.Objects;
    57 import java.util.Objects;
    55 import java.util.Optional;
    58 import java.util.Optional;
    56 import java.util.Set;
    59 import java.util.Set;
    57 import java.util.TreeSet;
    60 import java.util.TreeSet;
    58 import java.util.concurrent.CompletableFuture;
    61 import java.util.concurrent.CompletableFuture;
    59 import java.util.concurrent.ConcurrentLinkedQueue;
    62 import java.util.concurrent.CompletionException;
    60 import java.util.concurrent.ExecutionException;
    63 import java.util.concurrent.ExecutionException;
    61 import java.util.concurrent.Executor;
    64 import java.util.concurrent.Executor;
    62 import java.util.concurrent.Executors;
    65 import java.util.concurrent.Executors;
    63 import java.util.concurrent.ThreadFactory;
    66 import java.util.concurrent.ThreadFactory;
    64 import java.util.concurrent.atomic.AtomicInteger;
    67 import java.util.concurrent.atomic.AtomicInteger;
    65 import java.util.concurrent.atomic.AtomicLong;
    68 import java.util.concurrent.atomic.AtomicLong;
       
    69 import java.util.function.BooleanSupplier;
    66 import java.util.stream.Stream;
    70 import java.util.stream.Stream;
    67 import java.net.http.HttpClient;
    71 import java.net.http.HttpClient;
    68 import java.net.http.HttpRequest;
    72 import java.net.http.HttpRequest;
    69 import java.net.http.HttpResponse;
    73 import java.net.http.HttpResponse;
    70 import java.net.http.HttpResponse.BodyHandler;
    74 import java.net.http.HttpResponse.BodyHandler;
   119             t.setDaemon(true);
   123             t.setDaemon(true);
   120             return t;
   124             return t;
   121         }
   125         }
   122     }
   126     }
   123 
   127 
       
   128     /**
       
   129      * A DelegatingExecutor is an executor that delegates tasks to
       
   130      * a wrapped executor when it detects that the current thread
       
   131      * is the SelectorManager thread. If the current thread is not
       
   132      * the selector manager thread the given task is executed inline.
       
   133      */
       
   134     final static class DelegatingExecutor implements Executor {
       
   135         private final BooleanSupplier isInSelectorThread;
       
   136         private final Executor delegate;
       
   137         DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
       
   138             this.isInSelectorThread = isInSelectorThread;
       
   139             this.delegate = delegate;
       
   140         }
       
   141 
       
   142         Executor delegate() {
       
   143             return delegate;
       
   144         }
       
   145 
       
   146         @Override
       
   147         public void execute(Runnable command) {
       
   148             if (isInSelectorThread.getAsBoolean()) {
       
   149                 delegate.execute(command);
       
   150             } else {
       
   151                 command.run();
       
   152             }
       
   153         }
       
   154     }
       
   155 
   124     private final CookieHandler cookieHandler;
   156     private final CookieHandler cookieHandler;
   125     private final Redirect followRedirects;
   157     private final Redirect followRedirects;
   126     private final Optional<ProxySelector> userProxySelector;
   158     private final Optional<ProxySelector> userProxySelector;
   127     private final ProxySelector proxySelector;
   159     private final ProxySelector proxySelector;
   128     private final Authenticator authenticator;
   160     private final Authenticator authenticator;
   129     private final Version version;
   161     private final Version version;
   130     private final ConnectionPool connections;
   162     private final ConnectionPool connections;
   131     private final Executor executor;
   163     private final DelegatingExecutor delegatingExecutor;
   132     private final boolean isDefaultExecutor;
   164     private final boolean isDefaultExecutor;
   133     // Security parameters
   165     // Security parameters
   134     private final SSLContext sslContext;
   166     private final SSLContext sslContext;
   135     private final SSLParameters sslParams;
   167     private final SSLParameters sslParams;
   136     private final SelectorManager selmgr;
   168     private final SelectorManager selmgr;
   238         Executor ex = builder.executor;
   270         Executor ex = builder.executor;
   239         if (ex == null) {
   271         if (ex == null) {
   240             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
   272             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
   241             isDefaultExecutor = true;
   273             isDefaultExecutor = true;
   242         } else {
   274         } else {
   243             ex = builder.executor;
       
   244             isDefaultExecutor = false;
   275             isDefaultExecutor = false;
   245         }
   276         }
       
   277         delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
   246         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
   278         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
   247         client2 = new Http2ClientImpl(this);
   279         client2 = new Http2ClientImpl(this);
   248         executor = ex;
       
   249         cookieHandler = builder.cookieHandler;
   280         cookieHandler = builder.cookieHandler;
   250         followRedirects = builder.followRedirects == null ?
   281         followRedirects = builder.followRedirects == null ?
   251                 Redirect.NEVER : builder.followRedirects;
   282                 Redirect.NEVER : builder.followRedirects;
   252         this.userProxySelector = Optional.ofNullable(builder.proxy);
   283         this.userProxySelector = Optional.ofNullable(builder.proxy);
   253         this.proxySelector = userProxySelector
   284         this.proxySelector = userProxySelector
   487     @Override
   518     @Override
   488     public <T> HttpResponse<T>
   519     public <T> HttpResponse<T>
   489     send(HttpRequest req, BodyHandler<T> responseHandler)
   520     send(HttpRequest req, BodyHandler<T> responseHandler)
   490         throws IOException, InterruptedException
   521         throws IOException, InterruptedException
   491     {
   522     {
       
   523         CompletableFuture<HttpResponse<T>> cf = null;
   492         try {
   524         try {
   493             return sendAsync(req, responseHandler, null).get();
   525             cf = sendAsync(req, responseHandler, null, null);
       
   526             return cf.get();
       
   527         } catch (InterruptedException ie) {
       
   528             if (cf != null )
       
   529                 cf.cancel(true);
       
   530             throw ie;
   494         } catch (ExecutionException e) {
   531         } catch (ExecutionException e) {
   495             Throwable t = e.getCause();
   532             final Throwable throwable = e.getCause();
   496             if (t instanceof Error)
   533             final String msg = throwable.getMessage();
   497                 throw (Error)t;
   534 
   498             if (t instanceof RuntimeException)
   535             if (throwable instanceof IllegalArgumentException) {
   499                 throw (RuntimeException)t;
   536                 throw new IllegalArgumentException(msg, throwable);
   500             else if (t instanceof IOException)
   537             } else if (throwable instanceof SecurityException) {
   501                 throw Utils.getIOException(t);
   538                 throw new SecurityException(msg, throwable);
   502             else
   539             } else if (throwable instanceof HttpTimeoutException) {
   503                 throw new InternalError("Unexpected exception", t);
   540                 throw new HttpTimeoutException(msg);
   504         }
   541             } else if (throwable instanceof ConnectException) {
   505     }
   542                 ConnectException ce = new ConnectException(msg);
       
   543                 ce.initCause(throwable);
       
   544                 throw ce;
       
   545             } else if (throwable instanceof IOException) {
       
   546                 throw new IOException(msg, throwable);
       
   547             } else {
       
   548                 throw new IOException(msg, throwable);
       
   549             }
       
   550         }
       
   551     }
       
   552 
       
   553     private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
   506 
   554 
   507     @Override
   555     @Override
   508     public <T> CompletableFuture<HttpResponse<T>>
   556     public <T> CompletableFuture<HttpResponse<T>>
   509     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
   557     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
   510     {
   558     {
   511         return sendAsync(userRequest, responseHandler, null);
   559         return sendAsync(userRequest, responseHandler, null);
   512     }
   560     }
   513 
   561 
   514 
       
   515     @Override
   562     @Override
   516     public <T> CompletableFuture<HttpResponse<T>>
   563     public <T> CompletableFuture<HttpResponse<T>>
   517     sendAsync(HttpRequest userRequest,
   564     sendAsync(HttpRequest userRequest,
   518               BodyHandler<T> responseHandler,
   565               BodyHandler<T> responseHandler,
   519               PushPromiseHandler<T> pushPromiseHandler)
   566               PushPromiseHandler<T> pushPromiseHandler) {
   520     {
   567         return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
       
   568     }
       
   569 
       
   570     private <T> CompletableFuture<HttpResponse<T>>
       
   571     sendAsync(HttpRequest userRequest,
       
   572               BodyHandler<T> responseHandler,
       
   573               PushPromiseHandler<T> pushPromiseHandler,
       
   574               Executor exchangeExecutor)    {
       
   575 
   521         Objects.requireNonNull(userRequest);
   576         Objects.requireNonNull(userRequest);
   522         Objects.requireNonNull(responseHandler);
   577         Objects.requireNonNull(responseHandler);
   523 
   578 
   524         AccessControlContext acc = null;
   579         AccessControlContext acc = null;
   525         if (System.getSecurityManager() != null)
   580         if (System.getSecurityManager() != null)
   534         reference();
   589         reference();
   535         try {
   590         try {
   536             if (debugelapsed.on())
   591             if (debugelapsed.on())
   537                 debugelapsed.log("ClientImpl (async) send %s", userRequest);
   592                 debugelapsed.log("ClientImpl (async) send %s", userRequest);
   538 
   593 
   539             Executor executor = acc == null
   594             // When using sendAsync(...) we explicitly pass the
   540                     ? this.executor
   595             // executor's delegate as exchange executor to force
   541                     : new PrivilegedExecutor(this.executor, acc);
   596             // asynchronous scheduling of the exchange.
       
   597             // When using send(...) we don't specify any executor
       
   598             // and default to using the client's delegating executor
       
   599             // which only spawns asynchronous tasks if it detects
       
   600             // that the current thread is the selector manager
       
   601             // thread. This will cause everything to execute inline
       
   602             // until we need to schedule some event with the selector.
       
   603             Executor executor = exchangeExecutor == null
       
   604                     ? this.delegatingExecutor : exchangeExecutor;
   542 
   605 
   543             MultiExchange<T> mex = new MultiExchange<>(userRequest,
   606             MultiExchange<T> mex = new MultiExchange<>(userRequest,
   544                                                             requestImpl,
   607                                                             requestImpl,
   545                                                             this,
   608                                                             this,
   546                                                             responseHandler,
   609                                                             responseHandler,
   547                                                             pushPromiseHandler,
   610                                                             pushPromiseHandler,
   548                                                             acc);
   611                                                             acc);
   549             CompletableFuture<HttpResponse<T>> res =
   612             CompletableFuture<HttpResponse<T>> res =
   550                     mex.responseAsync().whenComplete((b,t) -> unreference());
   613                     mex.responseAsync(executor).whenComplete((b,t) -> unreference());
   551             if (DEBUGELAPSED) {
   614             if (DEBUGELAPSED) {
   552                 res = res.whenComplete(
   615                 res = res.whenComplete(
   553                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
   616                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
   554             }
   617             }
   555 
   618 
   556             // makes sure that any dependent actions happen in the executor
   619             // makes sure that any dependent actions happen in the CF default
   557             res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
   620             // executor. This is only needed for sendAsync(...), when
   558 
   621             // exchangeExecutor is non-null.
       
   622             if (exchangeExecutor != null) {
       
   623                 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
       
   624             }
   559             return res;
   625             return res;
   560         } catch(Throwable t) {
   626         } catch(Throwable t) {
   561             unreference();
   627             unreference();
   562             debugCompleted("ClientImpl (async)", start, userRequest);
   628             debugCompleted("ClientImpl (async)", start, userRequest);
   563             throw t;
   629             throw t;
   652         void wakeupSelector() {
   718         void wakeupSelector() {
   653             selector.wakeup();
   719             selector.wakeup();
   654         }
   720         }
   655 
   721 
   656         synchronized void shutdown() {
   722         synchronized void shutdown() {
       
   723             Log.logTrace("{0}: shutting down", getName());
   657             if (debug.on()) debug.log("SelectorManager shutting down");
   724             if (debug.on()) debug.log("SelectorManager shutting down");
   658             closed = true;
   725             closed = true;
   659             try {
   726             try {
   660                 selector.close();
   727                 selector.close();
   661             } catch (IOException ignored) {
   728             } catch (IOException ignored) {
   668         public void run() {
   735         public void run() {
   669             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
   736             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
   670             List<AsyncEvent> readyList = new ArrayList<>();
   737             List<AsyncEvent> readyList = new ArrayList<>();
   671             List<Runnable> resetList = new ArrayList<>();
   738             List<Runnable> resetList = new ArrayList<>();
   672             try {
   739             try {
       
   740                 if (Log.channel()) Log.logChannel(getName() + ": starting");
   673                 while (!Thread.currentThread().isInterrupted()) {
   741                 while (!Thread.currentThread().isInterrupted()) {
   674                     synchronized (this) {
   742                     synchronized (this) {
   675                         assert errorList.isEmpty();
   743                         assert errorList.isEmpty();
   676                         assert readyList.isEmpty();
   744                         assert readyList.isEmpty();
   677                         assert resetList.isEmpty();
   745                         assert resetList.isEmpty();
   704                                 sa.register(event);
   772                                 sa.register(event);
   705                                 if (!chan.isOpen()) {
   773                                 if (!chan.isOpen()) {
   706                                     throw new IOException("Channel closed");
   774                                     throw new IOException("Channel closed");
   707                                 }
   775                                 }
   708                             } catch (IOException e) {
   776                             } catch (IOException e) {
   709                                 Log.logTrace("HttpClientImpl: " + e);
   777                                 Log.logTrace("{0}: {1}", getName(), e);
   710                                 if (debug.on())
   778                                 if (debug.on())
   711                                     debug.log("Got " + e.getClass().getName()
   779                                     debug.log("Got " + e.getClass().getName()
   712                                               + " while handling registration events");
   780                                               + " while handling registration events");
   713                                 chan.close();
   781                                 chan.close();
   714                                 // let the event abort deal with it
   782                                 // let the event abort deal with it
   736                     errorList.clear();
   804                     errorList.clear();
   737 
   805 
   738                     // Check whether client is still alive, and if not,
   806                     // Check whether client is still alive, and if not,
   739                     // gracefully stop this thread
   807                     // gracefully stop this thread
   740                     if (!owner.isReferenced()) {
   808                     if (!owner.isReferenced()) {
   741                         Log.logTrace("HttpClient no longer referenced. Exiting...");
   809                         Log.logTrace("{0}: {1}",
       
   810                                 getName(),
       
   811                                 "HttpClient no longer referenced. Exiting...");
   742                         return;
   812                         return;
   743                     }
   813                     }
   744 
   814 
   745                     // Timeouts will have milliseconds granularity. It is important
   815                     // Timeouts will have milliseconds granularity. It is important
   746                     // to handle them in a timely fashion.
   816                     // to handle them in a timely fashion.
   778                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
   848                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
   779                     if (n == 0) {
   849                     if (n == 0) {
   780                         // Check whether client is still alive, and if not,
   850                         // Check whether client is still alive, and if not,
   781                         // gracefully stop this thread
   851                         // gracefully stop this thread
   782                         if (!owner.isReferenced()) {
   852                         if (!owner.isReferenced()) {
   783                             Log.logTrace("HttpClient no longer referenced. Exiting...");
   853                             Log.logTrace("{0}: {1}",
       
   854                                     getName(),
       
   855                                     "HttpClient no longer referenced. Exiting...");
   784                             return;
   856                             return;
   785                         }
   857                         }
   786                         owner.purgeTimeoutsAndReturnNextDeadline();
   858                         owner.purgeTimeoutsAndReturnNextDeadline();
   787                         continue;
   859                         continue;
   788                     }
   860                     }
   829                     resetList.forEach(r -> r.run());
   901                     resetList.forEach(r -> r.run());
   830                     resetList.clear();
   902                     resetList.clear();
   831 
   903 
   832                 }
   904                 }
   833             } catch (Throwable e) {
   905             } catch (Throwable e) {
   834                 //e.printStackTrace();
       
   835                 if (!closed) {
   906                 if (!closed) {
   836                     // This terminates thread. So, better just print stack trace
   907                     // This terminates thread. So, better just print stack trace
   837                     String err = Utils.stackTrace(e);
   908                     String err = Utils.stackTrace(e);
   838                     Log.logError("HttpClientImpl: fatal error: " + err);
   909                     Log.logError("{0}: {1}: {2}", getName(),
       
   910                             "HttpClientImpl shutting down due to fatal error", err);
   839                 }
   911                 }
   840                 if (debug.on()) debug.log("shutting down", e);
   912                 if (debug.on()) debug.log("shutting down", e);
   841                 if (Utils.ASSERTIONSENABLED && !debug.on()) {
   913                 if (Utils.ASSERTIONSENABLED && !debug.on()) {
   842                     e.printStackTrace(System.err); // always print the stack
   914                     e.printStackTrace(System.err); // always print the stack
   843                 }
   915                 }
   844             } finally {
   916             } finally {
       
   917                 if (Log.channel()) Log.logChannel(getName() + ": stopping");
   845                 shutdown();
   918                 shutdown();
   846             }
   919             }
   847         }
   920         }
   848 
   921 
   849 //        void debugPrint(Selector selector) {
   922 //        void debugPrint(Selector selector) {
  1011     @Override
  1084     @Override
  1012     public Optional<Authenticator> authenticator() {
  1085     public Optional<Authenticator> authenticator() {
  1013         return Optional.ofNullable(authenticator);
  1086         return Optional.ofNullable(authenticator);
  1014     }
  1087     }
  1015 
  1088 
  1016     /*package-private*/ final Executor theExecutor() {
  1089     /*package-private*/ final DelegatingExecutor theExecutor() {
  1017         return executor;
  1090         return delegatingExecutor;
  1018     }
  1091     }
  1019 
  1092 
  1020     @Override
  1093     @Override
  1021     public final Optional<Executor> executor() {
  1094     public final Optional<Executor> executor() {
  1022         return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
  1095         return isDefaultExecutor
       
  1096                 ? Optional.empty()
       
  1097                 : Optional.of(delegatingExecutor.delegate());
  1023     }
  1098     }
  1024 
  1099 
  1025     ConnectionPool connectionPool() {
  1100     ConnectionPool connectionPool() {
  1026         return connections;
  1101         return connections;
  1027     }
  1102     }