--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java Tue Feb 06 14:10:28 2018 +0000
@@ -0,0 +1,326 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal;
+
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.time.Duration;
+import java.util.List;
+import java.security.AccessControlContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
+import jdk.incubator.http.HttpTimeoutException;
+import jdk.incubator.http.internal.UntrustedBodyHandler;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.ConnectionExpiredException;
+import jdk.incubator.http.internal.common.Utils;
+import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
+import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
+
+/**
+ * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
+ * - manages filters
+ * - retries due to filters.
+ * - I/O errors and most other exceptions get returned directly to user
+ *
+ * Creates a new Exchange for each request/response interaction
+ */
+class MultiExchange<T> {
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ static final System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
+
+ private final HttpRequest userRequest; // the user request
+ private final HttpRequestImpl request; // a copy of the user request
+ final AccessControlContext acc;
+ final HttpClientImpl client;
+ final HttpResponse.BodyHandler<T> responseHandler;
+ final Executor executor;
+ final AtomicInteger attempts = new AtomicInteger();
+ HttpRequestImpl currentreq; // used for async only
+ Exchange<T> exchange; // the current exchange
+ Exchange<T> previous;
+ volatile Throwable retryCause;
+ volatile boolean expiredOnce;
+ volatile HttpResponse<T> response = null;
+
+ // Maximum number of times a request will be retried/redirected
+ // for any reason
+
+ static final int DEFAULT_MAX_ATTEMPTS = 5;
+ static final int max_attempts = Utils.getIntegerNetProperty(
+ "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
+ );
+
+ private final List<HeaderFilter> filters;
+ TimedEvent timedEvent;
+ volatile boolean cancelled;
+ final PushGroup<T> pushGroup;
+
+ /**
+ * Filter fields. These are attached as required by filters
+ * and only used by the filter implementations. This could be
+ * generalised into Objects that are passed explicitly to the filters
+ * (one per MultiExchange object, and one per Exchange object possibly)
+ */
+ volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
+ // RedirectHandler
+ volatile int numberOfRedirects = 0;
+
+ /**
+ * MultiExchange with one final response.
+ */
+ MultiExchange(HttpRequest userRequest,
+ HttpRequestImpl requestImpl,
+ HttpClientImpl client,
+ HttpResponse.BodyHandler<T> responseHandler,
+ PushPromiseHandler<T> pushPromiseHandler,
+ AccessControlContext acc) {
+ this.previous = null;
+ this.userRequest = userRequest;
+ this.request = requestImpl;
+ this.currentreq = request;
+ this.client = client;
+ this.filters = client.filterChain();
+ this.acc = acc;
+ this.executor = client.theExecutor();
+ this.responseHandler = responseHandler;
+ if (acc != null) {
+ // Restricts the file publisher with the senders ACC, if any
+ if (responseHandler instanceof UntrustedBodyHandler)
+ ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
+ }
+
+ if (pushPromiseHandler != null) {
+ this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
+ } else {
+ pushGroup = null;
+ }
+
+ this.exchange = new Exchange<>(request, this);
+ }
+
+ private synchronized Exchange<T> getExchange() {
+ return exchange;
+ }
+
+ HttpClientImpl client() {
+ return client;
+ }
+
+ HttpClient.Version version() {
+ HttpClient.Version vers = request.version().orElse(client.version());
+ if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
+ vers = HttpClient.Version.HTTP_1_1;
+ return vers;
+ }
+
+ private synchronized void setExchange(Exchange<T> exchange) {
+ if (this.exchange != null && exchange != this.exchange) {
+ this.exchange.released();
+ }
+ this.exchange = exchange;
+ }
+
+ private void cancelTimer() {
+ if (timedEvent != null) {
+ client.cancelTimer(timedEvent);
+ }
+ }
+
+ private void requestFilters(HttpRequestImpl r) throws IOException {
+ Log.logTrace("Applying request filters");
+ for (HeaderFilter filter : filters) {
+ Log.logTrace("Applying {0}", filter);
+ filter.request(r, this);
+ }
+ Log.logTrace("All filters applied");
+ }
+
+ private HttpRequestImpl responseFilters(Response response) throws IOException
+ {
+ Log.logTrace("Applying response filters");
+ for (HeaderFilter filter : filters) {
+ Log.logTrace("Applying {0}", filter);
+ HttpRequestImpl newreq = filter.response(response);
+ if (newreq != null) {
+ Log.logTrace("New request: stopping filters");
+ return newreq;
+ }
+ }
+ Log.logTrace("All filters applied");
+ return null;
+ }
+
+// public void cancel() {
+// cancelled = true;
+// getExchange().cancel();
+// }
+
+ public void cancel(IOException cause) {
+ cancelled = true;
+ getExchange().cancel(cause);
+ }
+
+ public CompletableFuture<HttpResponse<T>> responseAsync() {
+ CompletableFuture<Void> start = new MinimalFuture<>();
+ CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
+ start.completeAsync( () -> null, executor); // trigger execution
+ return cf;
+ }
+
+ private CompletableFuture<HttpResponse<T>>
+ responseAsync0(CompletableFuture<Void> start) {
+ return start.thenCompose( v -> responseAsyncImpl())
+ .thenCompose((Response r) -> {
+ Exchange<T> exch = getExchange();
+ return exch.readBodyAsync(responseHandler)
+ .thenApply((T body) -> {
+ this.response =
+ new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
+ return this.response;
+ });
+ });
+ }
+
+ private CompletableFuture<Response> responseAsyncImpl() {
+ CompletableFuture<Response> cf;
+ if (attempts.incrementAndGet() > max_attempts) {
+ cf = failedFuture(new IOException("Too many retries", retryCause));
+ } else {
+ if (currentreq.timeout().isPresent()) {
+ timedEvent = new TimedEvent(currentreq.timeout().get());
+ client.registerTimer(timedEvent);
+ }
+ try {
+ // 1. apply request filters
+ requestFilters(currentreq);
+ } catch (IOException e) {
+ return failedFuture(e);
+ }
+ Exchange<T> exch = getExchange();
+ // 2. get response
+ cf = exch.responseAsync()
+ .thenCompose((Response response) -> {
+ HttpRequestImpl newrequest;
+ try {
+ // 3. apply response filters
+ newrequest = responseFilters(response);
+ } catch (IOException e) {
+ return failedFuture(e);
+ }
+ // 4. check filter result and repeat or continue
+ if (newrequest == null) {
+ if (attempts.get() > 1) {
+ Log.logError("Succeeded on attempt: " + attempts);
+ }
+ return completedFuture(response);
+ } else {
+ this.response =
+ new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
+ Exchange<T> oldExch = exch;
+ return exch.ignoreBody().handle((r,t) -> {
+ currentreq = newrequest;
+ expiredOnce = false;
+ setExchange(new Exchange<>(currentreq, this, acc));
+ return responseAsyncImpl();
+ }).thenCompose(Function.identity());
+ } })
+ .handle((response, ex) -> {
+ // 5. handle errors and cancel any timer set
+ cancelTimer();
+ if (ex == null) {
+ assert response != null;
+ return completedFuture(response);
+ }
+ // all exceptions thrown are handled here
+ CompletableFuture<Response> errorCF = getExceptionalCF(ex);
+ if (errorCF == null) {
+ return responseAsyncImpl();
+ } else {
+ return errorCF;
+ } })
+ .thenCompose(Function.identity());
+ }
+ return cf;
+ }
+
+ /**
+ * Takes a Throwable and returns a suitable CompletableFuture that is
+ * completed exceptionally, or null.
+ */
+ private CompletableFuture<Response> getExceptionalCF(Throwable t) {
+ if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
+ if (t.getCause() != null) {
+ t = t.getCause();
+ }
+ }
+ if (cancelled && t instanceof IOException) {
+ t = new HttpTimeoutException("request timed out");
+ } else if (t instanceof ConnectionExpiredException) {
+ // allow the retry mechanism to do its work
+ // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
+ if (t.getCause() != null) retryCause = t.getCause();
+ if (!expiredOnce) {
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "MultiExchange: ConnectionExpiredException (async): retrying...",
+ t);
+ expiredOnce = true;
+ return null;
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "MultiExchange: ConnectionExpiredException (async): already retried once.",
+ t);
+ if (t.getCause() != null) t = t.getCause();
+ }
+ }
+ return failedFuture(t);
+ }
+
+ class TimedEvent extends TimeoutEvent {
+ TimedEvent(Duration duration) {
+ super(duration);
+ }
+ @Override
+ public void handle() {
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "Cancelling MultiExchange due to timeout for request %s",
+ request);
+ cancel(new HttpTimeoutException("request timed out"));
+ }
+ }
+}