src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56138 4f92b988600e
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.time.Duration;
       
    31 import java.util.List;
       
    32 import java.security.AccessControlContext;
       
    33 import java.util.concurrent.CompletableFuture;
       
    34 import java.util.concurrent.CompletionException;
       
    35 import java.util.concurrent.ExecutionException;
       
    36 import java.util.concurrent.Executor;
       
    37 import java.util.concurrent.atomic.AtomicInteger;
       
    38 import java.util.function.Function;
       
    39 
       
    40 import java.net.http.HttpClient;
       
    41 import java.net.http.HttpRequest;
       
    42 import java.net.http.HttpResponse;
       
    43 import java.net.http.HttpResponse.PushPromiseHandler;
       
    44 import java.net.http.HttpTimeoutException;
       
    45 import jdk.internal.net.http.UntrustedBodyHandler;
       
    46 import jdk.internal.net.http.common.Log;
       
    47 import jdk.internal.net.http.common.MinimalFuture;
       
    48 import jdk.internal.net.http.common.ConnectionExpiredException;
       
    49 import jdk.internal.net.http.common.Utils;
       
    50 import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
       
    51 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
       
    52 
       
    53 /**
       
    54  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
       
    55  * - manages filters
       
    56  * - retries due to filters.
       
    57  * - I/O errors and most other exceptions get returned directly to user
       
    58  *
       
    59  * Creates a new Exchange for each request/response interaction
       
    60  */
       
    61 class MultiExchange<T> {
       
    62 
       
    63     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    64     static final System.Logger DEBUG_LOGGER =
       
    65             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
       
    66 
       
    67     private final HttpRequest userRequest; // the user request
       
    68     private final HttpRequestImpl request; // a copy of the user request
       
    69     final AccessControlContext acc;
       
    70     final HttpClientImpl client;
       
    71     final HttpResponse.BodyHandler<T> responseHandler;
       
    72     final Executor executor;
       
    73     final AtomicInteger attempts = new AtomicInteger();
       
    74     HttpRequestImpl currentreq; // used for async only
       
    75     Exchange<T> exchange; // the current exchange
       
    76     Exchange<T> previous;
       
    77     volatile Throwable retryCause;
       
    78     volatile boolean expiredOnce;
       
    79     volatile HttpResponse<T> response = null;
       
    80 
       
    81     // Maximum number of times a request will be retried/redirected
       
    82     // for any reason
       
    83 
       
    84     static final int DEFAULT_MAX_ATTEMPTS = 5;
       
    85     static final int max_attempts = Utils.getIntegerNetProperty(
       
    86             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
       
    87     );
       
    88 
       
    89     private final List<HeaderFilter> filters;
       
    90     TimedEvent timedEvent;
       
    91     volatile boolean cancelled;
       
    92     final PushGroup<T> pushGroup;
       
    93 
       
    94     /**
       
    95      * Filter fields. These are attached as required by filters
       
    96      * and only used by the filter implementations. This could be
       
    97      * generalised into Objects that are passed explicitly to the filters
       
    98      * (one per MultiExchange object, and one per Exchange object possibly)
       
    99      */
       
   100     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
       
   101     // RedirectHandler
       
   102     volatile int numberOfRedirects = 0;
       
   103 
       
   104     /**
       
   105      * MultiExchange with one final response.
       
   106      */
       
   107     MultiExchange(HttpRequest userRequest,
       
   108                   HttpRequestImpl requestImpl,
       
   109                   HttpClientImpl client,
       
   110                   HttpResponse.BodyHandler<T> responseHandler,
       
   111                   PushPromiseHandler<T> pushPromiseHandler,
       
   112                   AccessControlContext acc) {
       
   113         this.previous = null;
       
   114         this.userRequest = userRequest;
       
   115         this.request = requestImpl;
       
   116         this.currentreq = request;
       
   117         this.client = client;
       
   118         this.filters = client.filterChain();
       
   119         this.acc = acc;
       
   120         this.executor = client.theExecutor();
       
   121         this.responseHandler = responseHandler;
       
   122         if (acc != null) {
       
   123             // Restricts the file publisher with the senders ACC, if any
       
   124             if (responseHandler instanceof UntrustedBodyHandler)
       
   125                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
       
   126         }
       
   127 
       
   128         if (pushPromiseHandler != null) {
       
   129             this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
       
   130         } else {
       
   131             pushGroup = null;
       
   132         }
       
   133 
       
   134         this.exchange = new Exchange<>(request, this);
       
   135     }
       
   136 
       
   137     private synchronized Exchange<T> getExchange() {
       
   138         return exchange;
       
   139     }
       
   140 
       
   141     HttpClientImpl client() {
       
   142         return client;
       
   143     }
       
   144 
       
   145     HttpClient.Version version() {
       
   146         HttpClient.Version vers = request.version().orElse(client.version());
       
   147         if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
       
   148             vers = HttpClient.Version.HTTP_1_1;
       
   149         return vers;
       
   150     }
       
   151 
       
   152     private synchronized void setExchange(Exchange<T> exchange) {
       
   153         if (this.exchange != null && exchange != this.exchange) {
       
   154             this.exchange.released();
       
   155         }
       
   156         this.exchange = exchange;
       
   157     }
       
   158 
       
   159     private void cancelTimer() {
       
   160         if (timedEvent != null) {
       
   161             client.cancelTimer(timedEvent);
       
   162         }
       
   163     }
       
   164 
       
   165     private void requestFilters(HttpRequestImpl r) throws IOException {
       
   166         Log.logTrace("Applying request filters");
       
   167         for (HeaderFilter filter : filters) {
       
   168             Log.logTrace("Applying {0}", filter);
       
   169             filter.request(r, this);
       
   170         }
       
   171         Log.logTrace("All filters applied");
       
   172     }
       
   173 
       
   174     private HttpRequestImpl responseFilters(Response response) throws IOException
       
   175     {
       
   176         Log.logTrace("Applying response filters");
       
   177         for (HeaderFilter filter : filters) {
       
   178             Log.logTrace("Applying {0}", filter);
       
   179             HttpRequestImpl newreq = filter.response(response);
       
   180             if (newreq != null) {
       
   181                 Log.logTrace("New request: stopping filters");
       
   182                 return newreq;
       
   183             }
       
   184         }
       
   185         Log.logTrace("All filters applied");
       
   186         return null;
       
   187     }
       
   188 
       
   189 //    public void cancel() {
       
   190 //        cancelled = true;
       
   191 //        getExchange().cancel();
       
   192 //    }
       
   193 
       
   194     public void cancel(IOException cause) {
       
   195         cancelled = true;
       
   196         getExchange().cancel(cause);
       
   197     }
       
   198 
       
   199     public CompletableFuture<HttpResponse<T>> responseAsync() {
       
   200         CompletableFuture<Void> start = new MinimalFuture<>();
       
   201         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
       
   202         start.completeAsync( () -> null, executor); // trigger execution
       
   203         return cf;
       
   204     }
       
   205 
       
   206     private CompletableFuture<HttpResponse<T>>
       
   207     responseAsync0(CompletableFuture<Void> start) {
       
   208         return start.thenCompose( v -> responseAsyncImpl())
       
   209                     .thenCompose((Response r) -> {
       
   210                         Exchange<T> exch = getExchange();
       
   211                         return exch.readBodyAsync(responseHandler)
       
   212                             .thenApply((T body) -> {
       
   213                                 this.response =
       
   214                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
       
   215                                 return this.response;
       
   216                             });
       
   217                     });
       
   218     }
       
   219 
       
   220     private CompletableFuture<Response> responseAsyncImpl() {
       
   221         CompletableFuture<Response> cf;
       
   222         if (attempts.incrementAndGet() > max_attempts) {
       
   223             cf = failedFuture(new IOException("Too many retries", retryCause));
       
   224         } else {
       
   225             if (currentreq.timeout().isPresent()) {
       
   226                 timedEvent = new TimedEvent(currentreq.timeout().get());
       
   227                 client.registerTimer(timedEvent);
       
   228             }
       
   229             try {
       
   230                 // 1. apply request filters
       
   231                 requestFilters(currentreq);
       
   232             } catch (IOException e) {
       
   233                 return failedFuture(e);
       
   234             }
       
   235             Exchange<T> exch = getExchange();
       
   236             // 2. get response
       
   237             cf = exch.responseAsync()
       
   238                      .thenCompose((Response response) -> {
       
   239                         HttpRequestImpl newrequest;
       
   240                         try {
       
   241                             // 3. apply response filters
       
   242                             newrequest = responseFilters(response);
       
   243                         } catch (IOException e) {
       
   244                             return failedFuture(e);
       
   245                         }
       
   246                         // 4. check filter result and repeat or continue
       
   247                         if (newrequest == null) {
       
   248                             if (attempts.get() > 1) {
       
   249                                 Log.logError("Succeeded on attempt: " + attempts);
       
   250                             }
       
   251                             return completedFuture(response);
       
   252                         } else {
       
   253                             this.response =
       
   254                                 new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
       
   255                             Exchange<T> oldExch = exch;
       
   256                             return exch.ignoreBody().handle((r,t) -> {
       
   257                                 currentreq = newrequest;
       
   258                                 expiredOnce = false;
       
   259                                 setExchange(new Exchange<>(currentreq, this, acc));
       
   260                                 return responseAsyncImpl();
       
   261                             }).thenCompose(Function.identity());
       
   262                         } })
       
   263                      .handle((response, ex) -> {
       
   264                         // 5. handle errors and cancel any timer set
       
   265                         cancelTimer();
       
   266                         if (ex == null) {
       
   267                             assert response != null;
       
   268                             return completedFuture(response);
       
   269                         }
       
   270                         // all exceptions thrown are handled here
       
   271                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
       
   272                         if (errorCF == null) {
       
   273                             return responseAsyncImpl();
       
   274                         } else {
       
   275                             return errorCF;
       
   276                         } })
       
   277                      .thenCompose(Function.identity());
       
   278         }
       
   279         return cf;
       
   280     }
       
   281 
       
   282     /**
       
   283      * Takes a Throwable and returns a suitable CompletableFuture that is
       
   284      * completed exceptionally, or null.
       
   285      */
       
   286     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
       
   287         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
       
   288             if (t.getCause() != null) {
       
   289                 t = t.getCause();
       
   290             }
       
   291         }
       
   292         if (cancelled && t instanceof IOException) {
       
   293             t = new HttpTimeoutException("request timed out");
       
   294         } else if (t instanceof ConnectionExpiredException) {
       
   295             // allow the retry mechanism to do its work
       
   296             // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
       
   297             if (t.getCause() != null) retryCause = t.getCause();
       
   298             if (!expiredOnce) {
       
   299                 DEBUG_LOGGER.log(Level.DEBUG,
       
   300                     "MultiExchange: ConnectionExpiredException (async): retrying...",
       
   301                     t);
       
   302                 expiredOnce = true;
       
   303                 return null;
       
   304             } else {
       
   305                 DEBUG_LOGGER.log(Level.DEBUG,
       
   306                     "MultiExchange: ConnectionExpiredException (async): already retried once.",
       
   307                     t);
       
   308                 if (t.getCause() != null) t = t.getCause();
       
   309             }
       
   310         }
       
   311         return failedFuture(t);
       
   312     }
       
   313 
       
   314     class TimedEvent extends TimeoutEvent {
       
   315         TimedEvent(Duration duration) {
       
   316             super(duration);
       
   317         }
       
   318         @Override
       
   319         public void handle() {
       
   320             DEBUG_LOGGER.log(Level.DEBUG,
       
   321                     "Cancelling MultiExchange due to timeout for request %s",
       
   322                      request);
       
   323             cancel(new HttpTimeoutException("request timed out"));
       
   324         }
       
   325     }
       
   326 }