changeset 56079 d23b02f37fce
parent 56043 08e8e41841cf
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
     1 /*
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     8  * particular file as subject to the "Classpath" exception as provided
     9  * by Oracle in the LICENSE file that accompanied this code.
    10  *
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    14  * version 2 for more details (a copy is included in the LICENSE file that
    15  * accompanied this code).
    16  *
    17  * You should have received a copy of the GNU General Public License version
    18  * 2 along with this work; if not, write to the Free Software Foundation,
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    20  *
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit if you need additional information or have any
    23  * questions.
    24  */
    26 package jdk.incubator.http.internal;
    28 import;
    29 import java.lang.System.Logger.Level;
    30 import java.time.Duration;
    31 import java.util.List;
    32 import;
    33 import java.util.concurrent.CompletableFuture;
    34 import java.util.concurrent.CompletionException;
    35 import java.util.concurrent.ExecutionException;
    36 import java.util.concurrent.Executor;
    37 import java.util.concurrent.atomic.AtomicInteger;
    38 import java.util.function.Function;
    40 import jdk.incubator.http.HttpClient;
    41 import jdk.incubator.http.HttpRequest;
    42 import jdk.incubator.http.HttpResponse;
    43 import jdk.incubator.http.HttpResponse.PushPromiseHandler;
    44 import jdk.incubator.http.HttpTimeoutException;
    45 import jdk.incubator.http.internal.UntrustedBodyHandler;
    46 import jdk.incubator.http.internal.common.Log;
    47 import jdk.incubator.http.internal.common.MinimalFuture;
    48 import jdk.incubator.http.internal.common.ConnectionExpiredException;
    49 import jdk.incubator.http.internal.common.Utils;
    50 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
    51 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
    53 /**
    54  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
    55  * - manages filters
    56  * - retries due to filters.
    57  * - I/O errors and most other exceptions get returned directly to user
    58  *
    59  * Creates a new Exchange for each request/response interaction
    60  */
    61 class MultiExchange<T> {
    63     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    64     static final System.Logger DEBUG_LOGGER =
    65             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
    67     private final HttpRequest userRequest; // the user request
    68     private final HttpRequestImpl request; // a copy of the user request
    69     final AccessControlContext acc;
    70     final HttpClientImpl client;
    71     final HttpResponse.BodyHandler<T> responseHandler;
    72     final Executor executor;
    73     final AtomicInteger attempts = new AtomicInteger();
    74     HttpRequestImpl currentreq; // used for async only
    75     Exchange<T> exchange; // the current exchange
    76     Exchange<T> previous;
    77     volatile Throwable retryCause;
    78     volatile boolean expiredOnce;
    79     volatile HttpResponse<T> response = null;
    81     // Maximum number of times a request will be retried/redirected
    82     // for any reason
    84     static final int DEFAULT_MAX_ATTEMPTS = 5;
    85     static final int max_attempts = Utils.getIntegerNetProperty(
    86             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
    87     );
    89     private final List<HeaderFilter> filters;
    90     TimedEvent timedEvent;
    91     volatile boolean cancelled;
    92     final PushGroup<T> pushGroup;
    94     /**
    95      * Filter fields. These are attached as required by filters
    96      * and only used by the filter implementations. This could be
    97      * generalised into Objects that are passed explicitly to the filters
    98      * (one per MultiExchange object, and one per Exchange object possibly)
    99      */
   100     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
   101     // RedirectHandler
   102     volatile int numberOfRedirects = 0;
   104     /**
   105      * MultiExchange with one final response.
   106      */
   107     MultiExchange(HttpRequest userRequest,
   108                   HttpRequestImpl requestImpl,
   109                   HttpClientImpl client,
   110                   HttpResponse.BodyHandler<T> responseHandler,
   111                   PushPromiseHandler<T> pushPromiseHandler,
   112                   AccessControlContext acc) {
   113         this.previous = null;
   114         this.userRequest = userRequest;
   115         this.request = requestImpl;
   116         this.currentreq = request;
   117         this.client = client;
   118         this.filters = client.filterChain();
   119         this.acc = acc;
   120         this.executor = client.theExecutor();
   121         this.responseHandler = responseHandler;
   122         if (acc != null) {
   123             // Restricts the file publisher with the senders ACC, if any
   124             if (responseHandler instanceof UntrustedBodyHandler)
   125                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
   126         }
   128         if (pushPromiseHandler != null) {
   129             this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
   130         } else {
   131             pushGroup = null;
   132         }
   134 = new Exchange<>(request, this);
   135     }
   137     private synchronized Exchange<T> getExchange() {
   138         return exchange;
   139     }
   141     HttpClientImpl client() {
   142         return client;
   143     }
   145     HttpClient.Version version() {
   146         HttpClient.Version vers = request.version().orElse(client.version());
   147         if (vers == HttpClient.Version.HTTP_2 && ! && request.proxy() != null)
   148             vers = HttpClient.Version.HTTP_1_1;
   149         return vers;
   150     }
   152     private synchronized void setExchange(Exchange<T> exchange) {
   153         if ( != null && exchange != {
   154   ;
   155         }
   156 = exchange;
   157     }
   159     private void cancelTimer() {
   160         if (timedEvent != null) {
   161             client.cancelTimer(timedEvent);
   162         }
   163     }
   165     private void requestFilters(HttpRequestImpl r) throws IOException {
   166         Log.logTrace("Applying request filters");
   167         for (HeaderFilter filter : filters) {
   168             Log.logTrace("Applying {0}", filter);
   169             filter.request(r, this);
   170         }
   171         Log.logTrace("All filters applied");
   172     }
   174     private HttpRequestImpl responseFilters(Response response) throws IOException
   175     {
   176         Log.logTrace("Applying response filters");
   177         for (HeaderFilter filter : filters) {
   178             Log.logTrace("Applying {0}", filter);
   179             HttpRequestImpl newreq = filter.response(response);
   180             if (newreq != null) {
   181                 Log.logTrace("New request: stopping filters");
   182                 return newreq;
   183             }
   184         }
   185         Log.logTrace("All filters applied");
   186         return null;
   187     }
   189 //    public void cancel() {
   190 //        cancelled = true;
   191 //        getExchange().cancel();
   192 //    }
   194     public void cancel(IOException cause) {
   195         cancelled = true;
   196         getExchange().cancel(cause);
   197     }
   199     public CompletableFuture<HttpResponse<T>> responseAsync() {
   200         CompletableFuture<Void> start = new MinimalFuture<>();
   201         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
   202         start.completeAsync( () -> null, executor); // trigger execution
   203         return cf;
   204     }
   206     private CompletableFuture<HttpResponse<T>>
   207     responseAsync0(CompletableFuture<Void> start) {
   208         return start.thenCompose( v -> responseAsyncImpl())
   209                     .thenCompose((Response r) -> {
   210                         Exchange<T> exch = getExchange();
   211                         return exch.readBodyAsync(responseHandler)
   212                             .thenApply((T body) -> {
   213                                 this.response =
   214                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
   215                                 return this.response;
   216                             });
   217                     });
   218     }
   220     private CompletableFuture<Response> responseAsyncImpl() {
   221         CompletableFuture<Response> cf;
   222         if (attempts.incrementAndGet() > max_attempts) {
   223             cf = failedFuture(new IOException("Too many retries", retryCause));
   224         } else {
   225             if (currentreq.timeout().isPresent()) {
   226                 timedEvent = new TimedEvent(currentreq.timeout().get());
   227                 client.registerTimer(timedEvent);
   228             }
   229             try {
   230                 // 1. apply request filters
   231                 requestFilters(currentreq);
   232             } catch (IOException e) {
   233                 return failedFuture(e);
   234             }
   235             Exchange<T> exch = getExchange();
   236             // 2. get response
   237             cf = exch.responseAsync()
   238                      .thenCompose((Response response) -> {
   239                         HttpRequestImpl newrequest;
   240                         try {
   241                             // 3. apply response filters
   242                             newrequest = responseFilters(response);
   243                         } catch (IOException e) {
   244                             return failedFuture(e);
   245                         }
   246                         // 4. check filter result and repeat or continue
   247                         if (newrequest == null) {
   248                             if (attempts.get() > 1) {
   249                                 Log.logError("Succeeded on attempt: " + attempts);
   250                             }
   251                             return completedFuture(response);
   252                         } else {
   253                             this.response =
   254                                 new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
   255                             Exchange<T> oldExch = exch;
   256                             return exch.ignoreBody().handle((r,t) -> {
   257                                 currentreq = newrequest;
   258                                 expiredOnce = false;
   259                                 setExchange(new Exchange<>(currentreq, this, acc));
   260                                 return responseAsyncImpl();
   261                             }).thenCompose(Function.identity());
   262                         } })
   263                      .handle((response, ex) -> {
   264                         // 5. handle errors and cancel any timer set
   265                         cancelTimer();
   266                         if (ex == null) {
   267                             assert response != null;
   268                             return completedFuture(response);
   269                         }
   270                         // all exceptions thrown are handled here
   271                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
   272                         if (errorCF == null) {
   273                             return responseAsyncImpl();
   274                         } else {
   275                             return errorCF;
   276                         } })
   277                      .thenCompose(Function.identity());
   278         }
   279         return cf;
   280     }
   282     /**
   283      * Takes a Throwable and returns a suitable CompletableFuture that is
   284      * completed exceptionally, or null.
   285      */
   286     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
   287         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
   288             if (t.getCause() != null) {
   289                 t = t.getCause();
   290             }
   291         }
   292         if (cancelled && t instanceof IOException) {
   293             t = new HttpTimeoutException("request timed out");
   294         } else if (t instanceof ConnectionExpiredException) {
   295             // allow the retry mechanism to do its work
   296             // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
   297             if (t.getCause() != null) retryCause = t.getCause();
   298             if (!expiredOnce) {
   299                 DEBUG_LOGGER.log(Level.DEBUG,
   300                     "MultiExchange: ConnectionExpiredException (async): retrying...",
   301                     t);
   302                 expiredOnce = true;
   303                 return null;
   304             } else {
   305                 DEBUG_LOGGER.log(Level.DEBUG,
   306                     "MultiExchange: ConnectionExpiredException (async): already retried once.",
   307                     t);
   308                 if (t.getCause() != null) t = t.getCause();
   309             }
   310         }
   311         return failedFuture(t);
   312     }
   314     class TimedEvent extends TimeoutEvent {
   315         TimedEvent(Duration duration) {
   316             super(duration);
   317         }
   318         @Override
   319         public void handle() {
   320             DEBUG_LOGGER.log(Level.DEBUG,
   321                     "Cancelling MultiExchange due to timeout for request %s",
   322                      request);
   323             cancel(new HttpTimeoutException("request timed out"));
   324         }
   325     }
   326 }