src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
    29 import java.time.Duration;
    30 import java.time.Duration;
    30 import java.util.List;
    31 import java.util.List;
    31 import java.security.AccessControlContext;
    32 import java.security.AccessControlContext;
    32 import java.security.AccessController;
       
    33 import java.util.concurrent.CompletableFuture;
    33 import java.util.concurrent.CompletableFuture;
    34 import java.util.concurrent.CompletionException;
    34 import java.util.concurrent.CompletionException;
    35 import java.util.concurrent.ExecutionException;
    35 import java.util.concurrent.ExecutionException;
    36 import java.util.function.BiFunction;
       
    37 import java.util.concurrent.Executor;
    36 import java.util.concurrent.Executor;
       
    37 import java.util.concurrent.atomic.AtomicInteger;
    38 import java.util.function.UnaryOperator;
    38 import java.util.function.UnaryOperator;
    39 
    39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
    40 import jdk.incubator.http.internal.common.Log;
    40 import jdk.incubator.http.internal.common.Log;
    41 import jdk.incubator.http.internal.common.MinimalFuture;
    41 import jdk.incubator.http.internal.common.MinimalFuture;
    42 import jdk.incubator.http.internal.common.Pair;
    42 import jdk.incubator.http.internal.common.ConnectionExpiredException;
    43 import jdk.incubator.http.internal.common.Utils;
    43 import jdk.incubator.http.internal.common.Utils;
    44 import static jdk.incubator.http.internal.common.Pair.pair;
    44 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
       
    45 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
    45 
    46 
    46 /**
    47 /**
    47  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
    48  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
    48  * - manages filters
    49  * - manages filters
    49  * - retries due to filters.
    50  * - retries due to filters.
    51  *
    52  *
    52  * Creates a new Exchange for each request/response interaction
    53  * Creates a new Exchange for each request/response interaction
    53  */
    54  */
    54 class MultiExchange<U,T> {
    55 class MultiExchange<U,T> {
    55 
    56 
       
    57     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    58     static final System.Logger DEBUG_LOGGER =
       
    59             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
       
    60 
    56     private final HttpRequest userRequest; // the user request
    61     private final HttpRequest userRequest; // the user request
    57     private final HttpRequestImpl request; // a copy of the user request
    62     private final HttpRequestImpl request; // a copy of the user request
    58     final AccessControlContext acc;
    63     final AccessControlContext acc;
    59     final HttpClientImpl client;
    64     final HttpClientImpl client;
    60     final HttpResponse.BodyHandler<T> responseHandler;
    65     final HttpResponse.BodyHandler<T> responseHandler;
    61     final ExecutorWrapper execWrapper;
       
    62     final Executor executor;
    66     final Executor executor;
    63     final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
    67     final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
       
    68     final AtomicInteger attempts = new AtomicInteger();
    64     HttpRequestImpl currentreq; // used for async only
    69     HttpRequestImpl currentreq; // used for async only
    65     Exchange<T> exchange; // the current exchange
    70     Exchange<T> exchange; // the current exchange
    66     Exchange<T> previous;
    71     Exchange<T> previous;
    67     int attempts;
    72     volatile Throwable retryCause;
       
    73     volatile boolean expiredOnce;
       
    74 
    68     // Maximum number of times a request will be retried/redirected
    75     // Maximum number of times a request will be retried/redirected
    69     // for any reason
    76     // for any reason
    70 
    77 
    71     static final int DEFAULT_MAX_ATTEMPTS = 5;
    78     static final int DEFAULT_MAX_ATTEMPTS = 5;
    72     static final int max_attempts = Utils.getIntegerNetProperty(
    79     static final int max_attempts = Utils.getIntegerNetProperty(
    91     /**
    98     /**
    92      * MultiExchange with one final response.
    99      * MultiExchange with one final response.
    93      */
   100      */
    94     MultiExchange(HttpRequest req,
   101     MultiExchange(HttpRequest req,
    95                   HttpClientImpl client,
   102                   HttpClientImpl client,
    96                   HttpResponse.BodyHandler<T> responseHandler) {
   103                   HttpResponse.BodyHandler<T> responseHandler,
       
   104                   AccessControlContext acc) {
    97         this.previous = null;
   105         this.previous = null;
    98         this.userRequest = req;
   106         this.userRequest = req;
    99         this.request = new HttpRequestImpl(req);
   107         this.request = new HttpRequestImpl(req, acc);
   100         this.currentreq = request;
   108         this.currentreq = request;
   101         this.attempts = 0;
       
   102         this.client = client;
   109         this.client = client;
   103         this.filters = client.filterChain();
   110         this.filters = client.filterChain();
   104         if (System.getSecurityManager() != null) {
   111         this.acc = acc;
   105             this.acc = AccessController.getContext();
   112         this.executor = client.theExecutor();
   106         } else {
       
   107             this.acc = null;
       
   108         }
       
   109         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
       
   110         this.executor = execWrapper.executor();
       
   111         this.responseHandler = responseHandler;
   113         this.responseHandler = responseHandler;
       
   114         if (acc != null) {
       
   115             // Restricts the file publisher with the senders ACC, if any
       
   116             if (responseHandler instanceof UntrustedBodyHandler)
       
   117                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
       
   118         }
   112         this.exchange = new Exchange<>(request, this);
   119         this.exchange = new Exchange<>(request, this);
   113         this.multiResponseHandler = null;
   120         this.multiResponseSubscriber = null;
   114         this.pushGroup = null;
   121         this.pushGroup = null;
   115     }
   122     }
   116 
   123 
   117     /**
   124     /**
   118      * MultiExchange with multiple responses (HTTP/2 server pushes).
   125      * MultiExchange with multiple responses (HTTP/2 server pushes).
   119      */
   126      */
   120     MultiExchange(HttpRequest req,
   127     MultiExchange(HttpRequest req,
   121                   HttpClientImpl client,
   128                   HttpClientImpl client,
   122                   HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
   129                   HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
       
   130                   AccessControlContext acc) {
   123         this.previous = null;
   131         this.previous = null;
   124         this.userRequest = req;
   132         this.userRequest = req;
   125         this.request = new HttpRequestImpl(req);
   133         this.request = new HttpRequestImpl(req, acc);
   126         this.currentreq = request;
   134         this.currentreq = request;
   127         this.attempts = 0;
       
   128         this.client = client;
   135         this.client = client;
   129         this.filters = client.filterChain();
   136         this.filters = client.filterChain();
   130         if (System.getSecurityManager() != null) {
   137         this.acc = acc;
   131             this.acc = AccessController.getContext();
   138         this.executor = client.theExecutor();
   132         } else {
   139         this.multiResponseSubscriber = multiResponseSubscriber;
   133             this.acc = null;
   140         this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
   134         }
       
   135         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
       
   136         this.executor = execWrapper.executor();
       
   137         this.multiResponseHandler = multiResponseHandler;
       
   138         this.pushGroup = new PushGroup<>(multiResponseHandler, request);
       
   139         this.exchange = new Exchange<>(request, this);
   141         this.exchange = new Exchange<>(request, this);
   140         this.responseHandler = pushGroup.mainResponseHandler();
   142         this.responseHandler = pushGroup.mainResponseHandler();
   141     }
   143     }
   142 
   144 
   143     public HttpResponseImpl<T> response() throws IOException, InterruptedException {
       
   144         HttpRequestImpl r = request;
       
   145         if (r.duration() != null) {
       
   146             timedEvent = new TimedEvent(r.duration());
       
   147             client.registerTimer(timedEvent);
       
   148         }
       
   149         while (attempts < max_attempts) {
       
   150             try {
       
   151                 attempts++;
       
   152                 Exchange<T> currExchange = getExchange();
       
   153                 requestFilters(r);
       
   154                 Response response = currExchange.response();
       
   155                 HttpRequestImpl newreq = responseFilters(response);
       
   156                 if (newreq == null) {
       
   157                     if (attempts > 1) {
       
   158                         Log.logError("Succeeded on attempt: " + attempts);
       
   159                     }
       
   160                     T body = currExchange.readBody(responseHandler);
       
   161                     cancelTimer();
       
   162                     return new HttpResponseImpl<>(userRequest, response, body, currExchange);
       
   163                 }
       
   164                 //response.body(HttpResponse.ignoreBody());
       
   165                 setExchange(new Exchange<>(newreq, this, acc));
       
   166                 r = newreq;
       
   167             } catch (IOException e) {
       
   168                 if (cancelled) {
       
   169                     throw new HttpTimeoutException("Request timed out");
       
   170                 }
       
   171                 throw e;
       
   172             }
       
   173         }
       
   174         cancelTimer();
       
   175         throw new IOException("Retry limit exceeded");
       
   176     }
       
   177 
       
   178     CompletableFuture<Void> multiCompletionCF() {
   145     CompletableFuture<Void> multiCompletionCF() {
   179         return pushGroup.groupResult();
   146         return pushGroup.groupResult();
   180     }
   147     }
   181 
   148 
   182     private synchronized Exchange<T> getExchange() {
   149     private synchronized Exchange<T> getExchange() {
   194     HttpClient.Version version() {
   161     HttpClient.Version version() {
   195         return request.version().orElse(client.version());
   162         return request.version().orElse(client.version());
   196     }
   163     }
   197 
   164 
   198     private synchronized void setExchange(Exchange<T> exchange) {
   165     private synchronized void setExchange(Exchange<T> exchange) {
       
   166         if (this.exchange != null && exchange != this.exchange) {
       
   167             this.exchange.released();
       
   168         }
   199         this.exchange = exchange;
   169         this.exchange = exchange;
   200     }
   170     }
   201 
   171 
   202     private void cancelTimer() {
   172     private void cancelTimer() {
   203         if (timedEvent != null) {
   173         if (timedEvent != null) {
   237     public void cancel(IOException cause) {
   207     public void cancel(IOException cause) {
   238         cancelled = true;
   208         cancelled = true;
   239         getExchange().cancel(cause);
   209         getExchange().cancel(cause);
   240     }
   210     }
   241 
   211 
   242     public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
   212     public CompletableFuture<HttpResponse<T>> responseAsync() {
   243         CompletableFuture<Void> start = new MinimalFuture<>();
   213         CompletableFuture<Void> start = new MinimalFuture<>();
   244         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
   214         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
   245         start.completeAsync( () -> null, executor); // trigger execution
   215         start.completeAsync( () -> null, executor); // trigger execution
   246         return cf;
   216         return cf;
   247     }
   217     }
   248 
   218 
   249     private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
   219     private CompletableFuture<HttpResponse<T>>
       
   220     responseAsync0(CompletableFuture<Void> start) {
   250         return start.thenCompose( v -> responseAsyncImpl())
   221         return start.thenCompose( v -> responseAsyncImpl())
   251             .thenCompose((Response r) -> {
   222                     .thenCompose((Response r) -> {
   252                 Exchange<T> exch = getExchange();
   223                         Exchange<T> exch = getExchange();
   253                 return exch.readBodyAsync(responseHandler)
   224                         return exch.readBodyAsync(responseHandler)
   254                         .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
   225                                    .thenApply((T body) ->
   255             });
   226                                            new HttpResponseImpl<>(userRequest,
       
   227                                                                   r,
       
   228                                                                   body,
       
   229                                                                   exch));
       
   230                     });
   256     }
   231     }
   257 
   232 
   258     CompletableFuture<U> multiResponseAsync() {
   233     CompletableFuture<U> multiResponseAsync() {
   259         CompletableFuture<Void> start = new MinimalFuture<>();
   234         CompletableFuture<Void> start = new MinimalFuture<>();
   260         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
   235         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
   261         CompletableFuture<HttpResponse<T>> mainResponse =
   236         CompletableFuture<HttpResponse<T>> mainResponse =
   262                 cf.thenApply((HttpResponseImpl<T> b) -> {
   237                 cf.thenApply(b -> {
   263                       multiResponseHandler.onResponse(b);
   238                         multiResponseSubscriber.onResponse(b);
   264                       return (HttpResponse<T>)b;
   239                         pushGroup.noMorePushes(true);
   265                    });
   240                         return b; });
   266 
       
   267         pushGroup.setMainResponse(mainResponse);
   241         pushGroup.setMainResponse(mainResponse);
   268         // set up house-keeping related to multi-response
   242         CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
   269         mainResponse.thenAccept((r) -> {
   243                                                                       pushGroup.pushesCF());
   270             // All push promises received by now.
       
   271             pushGroup.noMorePushes(true);
       
   272         });
       
   273         CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
       
   274         start.completeAsync( () -> null, executor); // trigger execution
   244         start.completeAsync( () -> null, executor); // trigger execution
   275         return res;
   245         return res;
   276     }
   246     }
   277 
   247 
   278     private CompletableFuture<Response> responseAsyncImpl() {
   248     private CompletableFuture<Response> responseAsyncImpl() {
   279         CompletableFuture<Response> cf;
   249         CompletableFuture<Response> cf;
   280         if (++attempts > max_attempts) {
   250         if (attempts.incrementAndGet() > max_attempts) {
   281             cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
   251             cf = failedFuture(new IOException("Too many retries", retryCause));
   282         } else {
   252         } else {
   283             if (currentreq.duration() != null) {
   253             if (currentreq.timeout().isPresent()) {
   284                 timedEvent = new TimedEvent(currentreq.duration());
   254                 timedEvent = new TimedEvent(currentreq.timeout().get());
   285                 client.registerTimer(timedEvent);
   255                 client.registerTimer(timedEvent);
   286             }
   256             }
   287             try {
   257             try {
   288                 // 1. Apply request filters
   258                 // 1. apply request filters
   289                 requestFilters(currentreq);
   259                 requestFilters(currentreq);
   290             } catch (IOException e) {
   260             } catch (IOException e) {
   291                 return MinimalFuture.failedFuture(e);
   261                 return failedFuture(e);
   292             }
   262             }
   293             Exchange<T> exch = getExchange();
   263             Exchange<T> exch = getExchange();
   294             // 2. get response
   264             // 2. get response
   295             cf = exch.responseAsync()
   265             cf = exch.responseAsync()
   296                 .thenCompose((Response response) -> {
   266                      .thenCompose((Response response) -> {
   297                     HttpRequestImpl newrequest = null;
   267                         HttpRequestImpl newrequest;
   298                     try {
   268                         try {
   299                         // 3. Apply response filters
   269                             // 3. apply response filters
   300                         newrequest = responseFilters(response);
   270                             newrequest = responseFilters(response);
   301                     } catch (IOException e) {
   271                         } catch (IOException e) {
   302                         return MinimalFuture.failedFuture(e);
   272                             return failedFuture(e);
   303                     }
       
   304                     // 4. Check filter result and repeat or continue
       
   305                     if (newrequest == null) {
       
   306                         if (attempts > 1) {
       
   307                             Log.logError("Succeeded on attempt: " + attempts);
       
   308                         }
   273                         }
   309                         return MinimalFuture.completedFuture(response);
   274                         // 4. check filter result and repeat or continue
   310                     } else {
   275                         if (newrequest == null) {
   311                         currentreq = newrequest;
   276                             if (attempts.get() > 1) {
   312                         setExchange(new Exchange<>(currentreq, this, acc));
   277                                 Log.logError("Succeeded on attempt: " + attempts);
   313                         //reads body off previous, and then waits for next response
   278                             }
   314                         return responseAsyncImpl();
   279                             return completedFuture(response);
   315                     }
   280                         } else {
   316                 })
   281                             currentreq = newrequest;
   317             // 5. Handle errors and cancel any timer set
   282                             expiredOnce = false;
   318             .handle((response, ex) -> {
   283                             setExchange(new Exchange<>(currentreq, this, acc));
   319                 cancelTimer();
   284                             //reads body off previous, and then waits for next response
   320                 if (ex == null) {
   285                             return responseAsyncImpl();
   321                     assert response != null;
   286                         } })
   322                     return MinimalFuture.completedFuture(response);
   287                      .handle((response, ex) -> {
   323                 }
   288                         // 5. handle errors and cancel any timer set
   324                 // all exceptions thrown are handled here
   289                         cancelTimer();
   325                 CompletableFuture<Response> error = getExceptionalCF(ex);
   290                         if (ex == null) {
   326                 if (error == null) {
   291                             assert response != null;
   327                     return responseAsyncImpl();
   292                             return completedFuture(response);
   328                 } else {
   293                         }
   329                     return error;
   294                         // all exceptions thrown are handled here
   330                 }
   295                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
   331             })
   296                         if (errorCF == null) {
   332             .thenCompose(UnaryOperator.identity());
   297                             return responseAsyncImpl();
       
   298                         } else {
       
   299                             return errorCF;
       
   300                         } })
       
   301                      .thenCompose(UnaryOperator.identity());
   333         }
   302         }
   334         return cf;
   303         return cf;
   335     }
   304     }
   336 
   305 
   337     /**
   306     /**
   338      * Take a Throwable and return a suitable CompletableFuture that is
   307      * Takes a Throwable and returns a suitable CompletableFuture that is
   339      * completed exceptionally.
   308      * completed exceptionally, or null.
   340      */
   309      */
   341     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
   310     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
   342         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
   311         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
   343             if (t.getCause() != null) {
   312             if (t.getCause() != null) {
   344                 t = t.getCause();
   313                 t = t.getCause();
   345             }
   314             }
   346         }
   315         }
   347         if (cancelled && t instanceof IOException) {
   316         if (cancelled && t instanceof IOException) {
   348             t = new HttpTimeoutException("request timed out");
   317             t = new HttpTimeoutException("request timed out");
   349         }
   318         } else if (t instanceof ConnectionExpiredException) {
   350         return MinimalFuture.failedFuture(t);
   319             // allow the retry mechanism to do its work
       
   320             // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
       
   321             if (t.getCause() != null) retryCause = t.getCause();
       
   322             if (!expiredOnce) {
       
   323                 DEBUG_LOGGER.log(Level.DEBUG,
       
   324                     "MultiExchange: ConnectionExpiredException (async): retrying...",
       
   325                     t);
       
   326                 expiredOnce = true;
       
   327                 return null;
       
   328             } else {
       
   329                 DEBUG_LOGGER.log(Level.DEBUG,
       
   330                     "MultiExchange: ConnectionExpiredException (async): already retried once.",
       
   331                     t);
       
   332                 if (t.getCause() != null) t = t.getCause();
       
   333             }
       
   334         }
       
   335         return failedFuture(t);
   351     }
   336     }
   352 
   337 
   353     class TimedEvent extends TimeoutEvent {
   338     class TimedEvent extends TimeoutEvent {
   354         TimedEvent(Duration duration) {
   339         TimedEvent(Duration duration) {
   355             super(duration);
   340             super(duration);
   356         }
   341         }
   357         @Override
   342         @Override
   358         public void handle() {
   343         public void handle() {
       
   344             DEBUG_LOGGER.log(Level.DEBUG,
       
   345                     "Cancelling MultiExchange due to timeout for request %s",
       
   346                      request);
   359             cancel(new HttpTimeoutException("request timed out"));
   347             cancel(new HttpTimeoutException("request timed out"));
   360         }
   348         }
   361     }
   349     }
   362 }
   350 }