diff -r aedd6133e7a0 -r fd85b2bf2b0d src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java Wed Feb 07 21:45:37 2018 +0000 @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.io.IOException; +import java.lang.System.Logger.Level; +import java.time.Duration; +import java.util.List; +import java.security.AccessControlContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.PushPromiseHandler; +import java.net.http.HttpTimeoutException; +import jdk.internal.net.http.UntrustedBodyHandler; +import jdk.internal.net.http.common.Log; +import jdk.internal.net.http.common.MinimalFuture; +import jdk.internal.net.http.common.ConnectionExpiredException; +import jdk.internal.net.http.common.Utils; +import static jdk.internal.net.http.common.MinimalFuture.completedFuture; +import static jdk.internal.net.http.common.MinimalFuture.failedFuture; + +/** + * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. + * - manages filters + * - retries due to filters. + * - I/O errors and most other exceptions get returned directly to user + * + * Creates a new Exchange for each request/response interaction + */ +class MultiExchange { + + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("MultiExchange"::toString, DEBUG); + + private final HttpRequest userRequest; // the user request + private final HttpRequestImpl request; // a copy of the user request + final AccessControlContext acc; + final HttpClientImpl client; + final HttpResponse.BodyHandler responseHandler; + final Executor executor; + final AtomicInteger attempts = new AtomicInteger(); + HttpRequestImpl currentreq; // used for async only + Exchange exchange; // the current exchange + Exchange previous; + volatile Throwable retryCause; + volatile boolean expiredOnce; + volatile HttpResponse response = null; + + // Maximum number of times a request will be retried/redirected + // for any reason + + static final int DEFAULT_MAX_ATTEMPTS = 5; + static final int max_attempts = Utils.getIntegerNetProperty( + "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS + ); + + private final List filters; + TimedEvent timedEvent; + volatile boolean cancelled; + final PushGroup pushGroup; + + /** + * Filter fields. These are attached as required by filters + * and only used by the filter implementations. This could be + * generalised into Objects that are passed explicitly to the filters + * (one per MultiExchange object, and one per Exchange object possibly) + */ + volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; + // RedirectHandler + volatile int numberOfRedirects = 0; + + /** + * MultiExchange with one final response. + */ + MultiExchange(HttpRequest userRequest, + HttpRequestImpl requestImpl, + HttpClientImpl client, + HttpResponse.BodyHandler responseHandler, + PushPromiseHandler pushPromiseHandler, + AccessControlContext acc) { + this.previous = null; + this.userRequest = userRequest; + this.request = requestImpl; + this.currentreq = request; + this.client = client; + this.filters = client.filterChain(); + this.acc = acc; + this.executor = client.theExecutor(); + this.responseHandler = responseHandler; + if (acc != null) { + // Restricts the file publisher with the senders ACC, if any + if (responseHandler instanceof UntrustedBodyHandler) + ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); + } + + if (pushPromiseHandler != null) { + this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc); + } else { + pushGroup = null; + } + + this.exchange = new Exchange<>(request, this); + } + + private synchronized Exchange getExchange() { + return exchange; + } + + HttpClientImpl client() { + return client; + } + + HttpClient.Version version() { + HttpClient.Version vers = request.version().orElse(client.version()); + if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null) + vers = HttpClient.Version.HTTP_1_1; + return vers; + } + + private synchronized void setExchange(Exchange exchange) { + if (this.exchange != null && exchange != this.exchange) { + this.exchange.released(); + } + this.exchange = exchange; + } + + private void cancelTimer() { + if (timedEvent != null) { + client.cancelTimer(timedEvent); + } + } + + private void requestFilters(HttpRequestImpl r) throws IOException { + Log.logTrace("Applying request filters"); + for (HeaderFilter filter : filters) { + Log.logTrace("Applying {0}", filter); + filter.request(r, this); + } + Log.logTrace("All filters applied"); + } + + private HttpRequestImpl responseFilters(Response response) throws IOException + { + Log.logTrace("Applying response filters"); + for (HeaderFilter filter : filters) { + Log.logTrace("Applying {0}", filter); + HttpRequestImpl newreq = filter.response(response); + if (newreq != null) { + Log.logTrace("New request: stopping filters"); + return newreq; + } + } + Log.logTrace("All filters applied"); + return null; + } + +// public void cancel() { +// cancelled = true; +// getExchange().cancel(); +// } + + public void cancel(IOException cause) { + cancelled = true; + getExchange().cancel(cause); + } + + public CompletableFuture> responseAsync() { + CompletableFuture start = new MinimalFuture<>(); + CompletableFuture> cf = responseAsync0(start); + start.completeAsync( () -> null, executor); // trigger execution + return cf; + } + + private CompletableFuture> + responseAsync0(CompletableFuture start) { + return start.thenCompose( v -> responseAsyncImpl()) + .thenCompose((Response r) -> { + Exchange exch = getExchange(); + return exch.readBodyAsync(responseHandler) + .thenApply((T body) -> { + this.response = + new HttpResponseImpl<>(userRequest, r, this.response, body, exch); + return this.response; + }); + }); + } + + private CompletableFuture responseAsyncImpl() { + CompletableFuture cf; + if (attempts.incrementAndGet() > max_attempts) { + cf = failedFuture(new IOException("Too many retries", retryCause)); + } else { + if (currentreq.timeout().isPresent()) { + timedEvent = new TimedEvent(currentreq.timeout().get()); + client.registerTimer(timedEvent); + } + try { + // 1. apply request filters + requestFilters(currentreq); + } catch (IOException e) { + return failedFuture(e); + } + Exchange exch = getExchange(); + // 2. get response + cf = exch.responseAsync() + .thenCompose((Response response) -> { + HttpRequestImpl newrequest; + try { + // 3. apply response filters + newrequest = responseFilters(response); + } catch (IOException e) { + return failedFuture(e); + } + // 4. check filter result and repeat or continue + if (newrequest == null) { + if (attempts.get() > 1) { + Log.logError("Succeeded on attempt: " + attempts); + } + return completedFuture(response); + } else { + this.response = + new HttpResponseImpl<>(currentreq, response, this.response, null, exch); + Exchange oldExch = exch; + return exch.ignoreBody().handle((r,t) -> { + currentreq = newrequest; + expiredOnce = false; + setExchange(new Exchange<>(currentreq, this, acc)); + return responseAsyncImpl(); + }).thenCompose(Function.identity()); + } }) + .handle((response, ex) -> { + // 5. handle errors and cancel any timer set + cancelTimer(); + if (ex == null) { + assert response != null; + return completedFuture(response); + } + // all exceptions thrown are handled here + CompletableFuture errorCF = getExceptionalCF(ex); + if (errorCF == null) { + return responseAsyncImpl(); + } else { + return errorCF; + } }) + .thenCompose(Function.identity()); + } + return cf; + } + + /** + * Takes a Throwable and returns a suitable CompletableFuture that is + * completed exceptionally, or null. + */ + private CompletableFuture getExceptionalCF(Throwable t) { + if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { + if (t.getCause() != null) { + t = t.getCause(); + } + } + if (cancelled && t instanceof IOException) { + t = new HttpTimeoutException("request timed out"); + } else if (t instanceof ConnectionExpiredException) { + // allow the retry mechanism to do its work + // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? ) + if (t.getCause() != null) retryCause = t.getCause(); + if (!expiredOnce) { + DEBUG_LOGGER.log(Level.DEBUG, + "MultiExchange: ConnectionExpiredException (async): retrying...", + t); + expiredOnce = true; + return null; + } else { + DEBUG_LOGGER.log(Level.DEBUG, + "MultiExchange: ConnectionExpiredException (async): already retried once.", + t); + if (t.getCause() != null) t = t.getCause(); + } + } + return failedFuture(t); + } + + class TimedEvent extends TimeoutEvent { + TimedEvent(Duration duration) { + super(duration); + } + @Override + public void handle() { + DEBUG_LOGGER.log(Level.DEBUG, + "Cancelling MultiExchange due to timeout for request %s", + request); + cancel(new HttpTimeoutException("request timed out")); + } + } +}