--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java Tue Feb 06 19:37:56 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,326 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal;
-
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.time.Duration;
-import java.util.List;
-import java.security.AccessControlContext;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-
-import jdk.incubator.http.HttpClient;
-import jdk.incubator.http.HttpRequest;
-import jdk.incubator.http.HttpResponse;
-import jdk.incubator.http.HttpResponse.PushPromiseHandler;
-import jdk.incubator.http.HttpTimeoutException;
-import jdk.incubator.http.internal.UntrustedBodyHandler;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.ConnectionExpiredException;
-import jdk.incubator.http.internal.common.Utils;
-import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
-import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
-
-/**
- * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
- * - manages filters
- * - retries due to filters.
- * - I/O errors and most other exceptions get returned directly to user
- *
- * Creates a new Exchange for each request/response interaction
- */
-class MultiExchange<T> {
-
- static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
- static final System.Logger DEBUG_LOGGER =
- Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
-
- private final HttpRequest userRequest; // the user request
- private final HttpRequestImpl request; // a copy of the user request
- final AccessControlContext acc;
- final HttpClientImpl client;
- final HttpResponse.BodyHandler<T> responseHandler;
- final Executor executor;
- final AtomicInteger attempts = new AtomicInteger();
- HttpRequestImpl currentreq; // used for async only
- Exchange<T> exchange; // the current exchange
- Exchange<T> previous;
- volatile Throwable retryCause;
- volatile boolean expiredOnce;
- volatile HttpResponse<T> response = null;
-
- // Maximum number of times a request will be retried/redirected
- // for any reason
-
- static final int DEFAULT_MAX_ATTEMPTS = 5;
- static final int max_attempts = Utils.getIntegerNetProperty(
- "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
- );
-
- private final List<HeaderFilter> filters;
- TimedEvent timedEvent;
- volatile boolean cancelled;
- final PushGroup<T> pushGroup;
-
- /**
- * Filter fields. These are attached as required by filters
- * and only used by the filter implementations. This could be
- * generalised into Objects that are passed explicitly to the filters
- * (one per MultiExchange object, and one per Exchange object possibly)
- */
- volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
- // RedirectHandler
- volatile int numberOfRedirects = 0;
-
- /**
- * MultiExchange with one final response.
- */
- MultiExchange(HttpRequest userRequest,
- HttpRequestImpl requestImpl,
- HttpClientImpl client,
- HttpResponse.BodyHandler<T> responseHandler,
- PushPromiseHandler<T> pushPromiseHandler,
- AccessControlContext acc) {
- this.previous = null;
- this.userRequest = userRequest;
- this.request = requestImpl;
- this.currentreq = request;
- this.client = client;
- this.filters = client.filterChain();
- this.acc = acc;
- this.executor = client.theExecutor();
- this.responseHandler = responseHandler;
- if (acc != null) {
- // Restricts the file publisher with the senders ACC, if any
- if (responseHandler instanceof UntrustedBodyHandler)
- ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
- }
-
- if (pushPromiseHandler != null) {
- this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
- } else {
- pushGroup = null;
- }
-
- this.exchange = new Exchange<>(request, this);
- }
-
- private synchronized Exchange<T> getExchange() {
- return exchange;
- }
-
- HttpClientImpl client() {
- return client;
- }
-
- HttpClient.Version version() {
- HttpClient.Version vers = request.version().orElse(client.version());
- if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
- vers = HttpClient.Version.HTTP_1_1;
- return vers;
- }
-
- private synchronized void setExchange(Exchange<T> exchange) {
- if (this.exchange != null && exchange != this.exchange) {
- this.exchange.released();
- }
- this.exchange = exchange;
- }
-
- private void cancelTimer() {
- if (timedEvent != null) {
- client.cancelTimer(timedEvent);
- }
- }
-
- private void requestFilters(HttpRequestImpl r) throws IOException {
- Log.logTrace("Applying request filters");
- for (HeaderFilter filter : filters) {
- Log.logTrace("Applying {0}", filter);
- filter.request(r, this);
- }
- Log.logTrace("All filters applied");
- }
-
- private HttpRequestImpl responseFilters(Response response) throws IOException
- {
- Log.logTrace("Applying response filters");
- for (HeaderFilter filter : filters) {
- Log.logTrace("Applying {0}", filter);
- HttpRequestImpl newreq = filter.response(response);
- if (newreq != null) {
- Log.logTrace("New request: stopping filters");
- return newreq;
- }
- }
- Log.logTrace("All filters applied");
- return null;
- }
-
-// public void cancel() {
-// cancelled = true;
-// getExchange().cancel();
-// }
-
- public void cancel(IOException cause) {
- cancelled = true;
- getExchange().cancel(cause);
- }
-
- public CompletableFuture<HttpResponse<T>> responseAsync() {
- CompletableFuture<Void> start = new MinimalFuture<>();
- CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
- start.completeAsync( () -> null, executor); // trigger execution
- return cf;
- }
-
- private CompletableFuture<HttpResponse<T>>
- responseAsync0(CompletableFuture<Void> start) {
- return start.thenCompose( v -> responseAsyncImpl())
- .thenCompose((Response r) -> {
- Exchange<T> exch = getExchange();
- return exch.readBodyAsync(responseHandler)
- .thenApply((T body) -> {
- this.response =
- new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
- return this.response;
- });
- });
- }
-
- private CompletableFuture<Response> responseAsyncImpl() {
- CompletableFuture<Response> cf;
- if (attempts.incrementAndGet() > max_attempts) {
- cf = failedFuture(new IOException("Too many retries", retryCause));
- } else {
- if (currentreq.timeout().isPresent()) {
- timedEvent = new TimedEvent(currentreq.timeout().get());
- client.registerTimer(timedEvent);
- }
- try {
- // 1. apply request filters
- requestFilters(currentreq);
- } catch (IOException e) {
- return failedFuture(e);
- }
- Exchange<T> exch = getExchange();
- // 2. get response
- cf = exch.responseAsync()
- .thenCompose((Response response) -> {
- HttpRequestImpl newrequest;
- try {
- // 3. apply response filters
- newrequest = responseFilters(response);
- } catch (IOException e) {
- return failedFuture(e);
- }
- // 4. check filter result and repeat or continue
- if (newrequest == null) {
- if (attempts.get() > 1) {
- Log.logError("Succeeded on attempt: " + attempts);
- }
- return completedFuture(response);
- } else {
- this.response =
- new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
- Exchange<T> oldExch = exch;
- return exch.ignoreBody().handle((r,t) -> {
- currentreq = newrequest;
- expiredOnce = false;
- setExchange(new Exchange<>(currentreq, this, acc));
- return responseAsyncImpl();
- }).thenCompose(Function.identity());
- } })
- .handle((response, ex) -> {
- // 5. handle errors and cancel any timer set
- cancelTimer();
- if (ex == null) {
- assert response != null;
- return completedFuture(response);
- }
- // all exceptions thrown are handled here
- CompletableFuture<Response> errorCF = getExceptionalCF(ex);
- if (errorCF == null) {
- return responseAsyncImpl();
- } else {
- return errorCF;
- } })
- .thenCompose(Function.identity());
- }
- return cf;
- }
-
- /**
- * Takes a Throwable and returns a suitable CompletableFuture that is
- * completed exceptionally, or null.
- */
- private CompletableFuture<Response> getExceptionalCF(Throwable t) {
- if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
- if (t.getCause() != null) {
- t = t.getCause();
- }
- }
- if (cancelled && t instanceof IOException) {
- t = new HttpTimeoutException("request timed out");
- } else if (t instanceof ConnectionExpiredException) {
- // allow the retry mechanism to do its work
- // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
- if (t.getCause() != null) retryCause = t.getCause();
- if (!expiredOnce) {
- DEBUG_LOGGER.log(Level.DEBUG,
- "MultiExchange: ConnectionExpiredException (async): retrying...",
- t);
- expiredOnce = true;
- return null;
- } else {
- DEBUG_LOGGER.log(Level.DEBUG,
- "MultiExchange: ConnectionExpiredException (async): already retried once.",
- t);
- if (t.getCause() != null) t = t.getCause();
- }
- }
- return failedFuture(t);
- }
-
- class TimedEvent extends TimeoutEvent {
- TimedEvent(Duration duration) {
- super(duration);
- }
- @Override
- public void handle() {
- DEBUG_LOGGER.log(Level.DEBUG,
- "Cancelling MultiExchange due to timeout for request %s",
- request);
- cancel(new HttpTimeoutException("request timed out"));
- }
- }
-}