src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
branchhttp-client-branch
changeset 56010 782b2f2d1e76
parent 55973 4d9b002587db
child 56043 08e8e41841cf
equal deleted inserted replaced
56009:cf8792f51dee 56010:782b2f2d1e76
    34 import java.util.concurrent.CompletionException;
    34 import java.util.concurrent.CompletionException;
    35 import java.util.concurrent.ExecutionException;
    35 import java.util.concurrent.ExecutionException;
    36 import java.util.concurrent.Executor;
    36 import java.util.concurrent.Executor;
    37 import java.util.concurrent.atomic.AtomicInteger;
    37 import java.util.concurrent.atomic.AtomicInteger;
    38 import java.util.function.Function;
    38 import java.util.function.Function;
       
    39 import jdk.incubator.http.HttpResponse.PushPromiseHandler;
    39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
    40 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
    40 import jdk.incubator.http.internal.common.Log;
    41 import jdk.incubator.http.internal.common.Log;
    41 import jdk.incubator.http.internal.common.MinimalFuture;
    42 import jdk.incubator.http.internal.common.MinimalFuture;
    42 import jdk.incubator.http.internal.common.ConnectionExpiredException;
    43 import jdk.incubator.http.internal.common.ConnectionExpiredException;
    43 import jdk.incubator.http.internal.common.Utils;
    44 import jdk.incubator.http.internal.common.Utils;
    50  * - retries due to filters.
    51  * - retries due to filters.
    51  * - I/O errors and most other exceptions get returned directly to user
    52  * - I/O errors and most other exceptions get returned directly to user
    52  *
    53  *
    53  * Creates a new Exchange for each request/response interaction
    54  * Creates a new Exchange for each request/response interaction
    54  */
    55  */
    55 class MultiExchange<U,T> {
    56 class MultiExchange<T> {
    56 
    57 
    57     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    58     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    58     static final System.Logger DEBUG_LOGGER =
    59     static final System.Logger DEBUG_LOGGER =
    59             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
    60             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
    60 
    61 
    62     private final HttpRequestImpl request; // a copy of the user request
    63     private final HttpRequestImpl request; // a copy of the user request
    63     final AccessControlContext acc;
    64     final AccessControlContext acc;
    64     final HttpClientImpl client;
    65     final HttpClientImpl client;
    65     final HttpResponse.BodyHandler<T> responseHandler;
    66     final HttpResponse.BodyHandler<T> responseHandler;
    66     final Executor executor;
    67     final Executor executor;
    67     final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
       
    68     final AtomicInteger attempts = new AtomicInteger();
    68     final AtomicInteger attempts = new AtomicInteger();
    69     HttpRequestImpl currentreq; // used for async only
    69     HttpRequestImpl currentreq; // used for async only
    70     Exchange<T> exchange; // the current exchange
    70     Exchange<T> exchange; // the current exchange
    71     Exchange<T> previous;
    71     Exchange<T> previous;
    72     volatile Throwable retryCause;
    72     volatile Throwable retryCause;
    82     );
    82     );
    83 
    83 
    84     private final List<HeaderFilter> filters;
    84     private final List<HeaderFilter> filters;
    85     TimedEvent timedEvent;
    85     TimedEvent timedEvent;
    86     volatile boolean cancelled;
    86     volatile boolean cancelled;
    87     final PushGroup<U,T> pushGroup;
    87     final PushGroup<T> pushGroup;
    88 
    88 
    89     /**
    89     /**
    90      * Filter fields. These are attached as required by filters
    90      * Filter fields. These are attached as required by filters
    91      * and only used by the filter implementations. This could be
    91      * and only used by the filter implementations. This could be
    92      * generalised into Objects that are passed explicitly to the filters
    92      * generalised into Objects that are passed explicitly to the filters
   101      */
   101      */
   102     MultiExchange(HttpRequest userRequest,
   102     MultiExchange(HttpRequest userRequest,
   103                   HttpRequestImpl requestImpl,
   103                   HttpRequestImpl requestImpl,
   104                   HttpClientImpl client,
   104                   HttpClientImpl client,
   105                   HttpResponse.BodyHandler<T> responseHandler,
   105                   HttpResponse.BodyHandler<T> responseHandler,
       
   106                   PushPromiseHandler<T> pushPromiseHandler,
   106                   AccessControlContext acc) {
   107                   AccessControlContext acc) {
   107         this.previous = null;
   108         this.previous = null;
   108         this.userRequest = userRequest;
   109         this.userRequest = userRequest;
   109         this.request = requestImpl;
   110         this.request = requestImpl;
   110         this.currentreq = request;
   111         this.currentreq = request;
   116         if (acc != null) {
   117         if (acc != null) {
   117             // Restricts the file publisher with the senders ACC, if any
   118             // Restricts the file publisher with the senders ACC, if any
   118             if (responseHandler instanceof UntrustedBodyHandler)
   119             if (responseHandler instanceof UntrustedBodyHandler)
   119                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
   120                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
   120         }
   121         }
       
   122 
       
   123         if (pushPromiseHandler != null) {
       
   124             this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
       
   125         } else {
       
   126             pushGroup = null;
       
   127         }
       
   128 
   121         this.exchange = new Exchange<>(request, this);
   129         this.exchange = new Exchange<>(request, this);
   122         this.multiResponseSubscriber = null;
   130     }
   123         this.pushGroup = null;
       
   124     }
       
   125 
       
   126     /**
       
   127      * MultiExchange with multiple responses (HTTP/2 server pushes).
       
   128      */
       
   129     MultiExchange(HttpRequest userRequest,
       
   130                   HttpRequestImpl requestImpl,
       
   131                   HttpClientImpl client,
       
   132                   HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
       
   133                   AccessControlContext acc) {
       
   134         this.previous = null;
       
   135         this.userRequest = userRequest;
       
   136         this.request = requestImpl;
       
   137         this.currentreq = request;
       
   138         this.client = client;
       
   139         this.filters = client.filterChain();
       
   140         this.acc = acc;
       
   141         this.executor = client.theExecutor();
       
   142         this.multiResponseSubscriber = multiResponseSubscriber;
       
   143         this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
       
   144         this.exchange = new Exchange<>(request, this);
       
   145         this.responseHandler = pushGroup.mainResponseHandler();
       
   146     }
       
   147 
       
   148 //    CompletableFuture<Void> multiCompletionCF() {
       
   149 //        return pushGroup.groupResult();
       
   150 //    }
       
   151 
   131 
   152     private synchronized Exchange<T> getExchange() {
   132     private synchronized Exchange<T> getExchange() {
   153         return exchange;
   133         return exchange;
   154     }
   134     }
   155 
   135 
   156     HttpClientImpl client() {
   136     HttpClientImpl client() {
   157         return client;
   137         return client;
   158     }
   138     }
   159 
       
   160 //    HttpClient.Redirect followRedirects() {
       
   161 //        return client.followRedirects();
       
   162 //    }
       
   163 
   139 
   164     HttpClient.Version version() {
   140     HttpClient.Version version() {
   165         return request.version().orElse(client.version());
   141         return request.version().orElse(client.version());
   166     }
   142     }
   167 
   143 
   229                                 this.response =
   205                                 this.response =
   230                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
   206                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
   231                                 return this.response;
   207                                 return this.response;
   232                             });
   208                             });
   233                     });
   209                     });
   234     }
       
   235 
       
   236     CompletableFuture<U> multiResponseAsync() {
       
   237         CompletableFuture<Void> start = new MinimalFuture<>();
       
   238         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
       
   239         CompletableFuture<HttpResponse<T>> mainResponse =
       
   240                 cf.thenApply(b -> {
       
   241                         multiResponseSubscriber.onResponse(b);
       
   242                         pushGroup.noMorePushes(true);
       
   243                         return b; });
       
   244         pushGroup.setMainResponse(mainResponse);
       
   245         CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
       
   246                                                                       pushGroup.pushesCF());
       
   247         start.completeAsync( () -> null, executor); // trigger execution
       
   248         return res;
       
   249     }
   210     }
   250 
   211 
   251     private CompletableFuture<Response> responseAsyncImpl() {
   212     private CompletableFuture<Response> responseAsyncImpl() {
   252         CompletableFuture<Response> cf;
   213         CompletableFuture<Response> cf;
   253         if (attempts.incrementAndGet() > max_attempts) {
   214         if (attempts.incrementAndGet() > max_attempts) {