/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.io.IOException;
import java.net.ConnectException;
import java.net.http.HttpConnectTimeoutException;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedList;
import java.security.AccessControlContext;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.HttpTimeoutException;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.ConnectionExpiredException;
import jdk.internal.net.http.common.Utils;
import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
/**
* Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
* - manages filters
* - retries due to filters.
* - I/O errors and most other exceptions get returned directly to user
*
* Creates a new Exchange for each request/response interaction
*/
class MultiExchange<T> {
static final Logger debug =
Utils.getDebugLogger("MultiExchange"::toString, Utils.DEBUG);
private final HttpRequest userRequest; // the user request
private final HttpRequestImpl request; // a copy of the user request
private final ConnectTimeoutTracker connectTimeout; // null if no timeout
final AccessControlContext acc;
final HttpClientImpl client;
final HttpResponse.BodyHandler<T> responseHandler;
final HttpClientImpl.DelegatingExecutor executor;
final AtomicInteger attempts = new AtomicInteger();
HttpRequestImpl currentreq; // used for retries & redirect
HttpRequestImpl previousreq; // used for retries & redirect
Exchange<T> exchange; // the current exchange
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;
// Maximum number of times a request will be retried/redirected
// for any reason
static final int DEFAULT_MAX_ATTEMPTS = 5;
static final int max_attempts = Utils.getIntegerNetProperty(
"jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
);
private final LinkedList<HeaderFilter> filters;
ResponseTimerEvent responseTimerEvent;
volatile boolean cancelled;
final PushGroup<T> pushGroup;
/**
* Filter fields. These are attached as required by filters
* and only used by the filter implementations. This could be
* generalised into Objects that are passed explicitly to the filters
* (one per MultiExchange object, and one per Exchange object possibly)
*/
volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
// RedirectHandler
volatile int numberOfRedirects = 0;
// This class is used to keep track of the connection timeout
// across retries, when a ConnectException causes a retry.
// In that case - we will retry the connect, but we don't
// want to double the timeout by starting a new timer with
// the full connectTimeout again.
// Instead we use the ConnectTimeoutTracker to return a new
// duration that takes into account the time spent in the
// first connect attempt.
// If however, the connection gets connected, but we later
// retry the whole operation, then we reset the timer before
// retrying (since the connection used for the second request
// will not necessarily be the same: it could be a new
// unconnected connection) - see getExceptionalCF().
private static final class ConnectTimeoutTracker {
final Duration max;
final AtomicLong startTime = new AtomicLong();
ConnectTimeoutTracker(Duration connectTimeout) {
this.max = Objects.requireNonNull(connectTimeout);
}
Duration getRemaining() {
long now = System.nanoTime();
long previous = startTime.compareAndExchange(0, now);
if (previous == 0 || max.isZero()) return max;
Duration remaining = max.minus(Duration.ofNanos(now - previous));
assert remaining.compareTo(max) <= 0;
return remaining.isNegative() ? Duration.ZERO : remaining;
}
void reset() { startTime.set(0); }
}
/**
* MultiExchange with one final response.
*/
MultiExchange(HttpRequest userRequest,
HttpRequestImpl requestImpl,
HttpClientImpl client,
HttpResponse.BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler,
AccessControlContext acc) {
this.previous = null;
this.userRequest = userRequest;
this.request = requestImpl;
this.currentreq = request;
this.previousreq = null;
this.client = client;
this.filters = client.filterChain();
this.acc = acc;
this.executor = client.theExecutor();
this.responseHandler = responseHandler;
if (pushPromiseHandler != null) {
Executor executor = acc == null
? this.executor.delegate()
: new PrivilegedExecutor(this.executor.delegate(), acc);
this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
} else {
pushGroup = null;
}
this.connectTimeout = client.connectTimeout()
.map(ConnectTimeoutTracker::new).orElse(null);
this.exchange = new Exchange<>(request, this);
}
synchronized Exchange<T> getExchange() {
return exchange;
}
HttpClientImpl client() {
return client;
}
HttpClient.Version version() {
HttpClient.Version vers = request.version().orElse(client.version());
if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
vers = HttpClient.Version.HTTP_1_1;
return vers;
}
private synchronized void setExchange(Exchange<T> exchange) {
if (this.exchange != null && exchange != this.exchange) {
this.exchange.released();
}
this.exchange = exchange;
}
public Optional<Duration> remainingConnectTimeout() {
return Optional.ofNullable(connectTimeout)
.map(ConnectTimeoutTracker::getRemaining);
}
private void cancelTimer() {
if (responseTimerEvent != null) {
client.cancelTimer(responseTimerEvent);
}
}
private void requestFilters(HttpRequestImpl r) throws IOException {
Log.logTrace("Applying request filters");
for (HeaderFilter filter : filters) {
Log.logTrace("Applying {0}", filter);
filter.request(r, this);
}
Log.logTrace("All filters applied");
}
private HttpRequestImpl responseFilters(Response response) throws IOException
{
Log.logTrace("Applying response filters");
Iterator<HeaderFilter> reverseItr = filters.descendingIterator();
while (reverseItr.hasNext()) {
HeaderFilter filter = reverseItr.next();
Log.logTrace("Applying {0}", filter);
HttpRequestImpl newreq = filter.response(response);
if (newreq != null) {
Log.logTrace("New request: stopping filters");
return newreq;
}
}
Log.logTrace("All filters applied");
return null;
}
public void cancel(IOException cause) {
cancelled = true;
getExchange().cancel(cause);
}
public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
start.completeAsync( () -> null, executor); // trigger execution
return cf;
}
// return true if the response is a type where a response body is never possible
// and therefore doesn't have to include header information which indicates no
// body is present. This is distinct from responses that also do not contain
// response bodies (possibly ever) but which are required to have content length
// info in the header (eg 205). Those cases do not have to be handled specially
private static boolean bodyNotPermitted(Response r) {
return r.statusCode == 204;
}
private boolean bodyIsPresent(Response r) {
HttpHeaders headers = r.headers();
if (headers.firstValueAsLong("Content-length").orElse(0L) != 0L)
return true;
if (headers.firstValue("Transfer-encoding").isPresent())
return true;
return false;
}
// Call the user's body handler to get an empty body object
private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T> exch) {
BodySubscriber<T> bs = responseHandler.apply(new ResponseInfoImpl(r.statusCode(),
r.headers(), r.version()));
bs.onSubscribe(new NullSubscription());
bs.onComplete();
CompletionStage<T> cs = ResponseSubscribers.getBodyAsync(executor, bs);
MinimalFuture<HttpResponse<T>> result = new MinimalFuture<>();
cs.whenComplete((nullBody, exception) -> {
if (exception != null)
result.completeExceptionally(exception);
else {
this.response =
new HttpResponseImpl<>(r.request(), r, this.response, nullBody, exch);
result.complete(this.response);
}
});
// ensure that the connection is closed or returned to the pool.
return result.whenComplete(exch::nullBody);
}
private CompletableFuture<HttpResponse<T>>
responseAsync0(CompletableFuture<Void> start) {
return start.thenCompose( v -> responseAsyncImpl())
.thenCompose((Response r) -> {
Exchange<T> exch = getExchange();
if (bodyNotPermitted(r)) {
if (bodyIsPresent(r)) {
IOException ioe = new IOException(
"unexpected content length header with 204 response");
exch.cancel(ioe);
return MinimalFuture.failedFuture(ioe);
} else
return handleNoBody(r, exch);
}
return exch.readBodyAsync(responseHandler)
.thenApply((T body) -> {
this.response =
new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
return this.response;
});
});
}
static class NullSubscription implements Flow.Subscription {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
}
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (attempts.incrementAndGet() > max_attempts) {
cf = failedFuture(new IOException("Too many retries", retryCause));
} else {
if (currentreq.timeout().isPresent()) {
responseTimerEvent = ResponseTimerEvent.of(this);
client.registerTimer(responseTimerEvent);
}
try {
// 1. apply request filters
// if currentreq == previousreq the filters have already
// been applied once. Applying them a second time might
// cause some headers values to be added twice: for
// instance, the same cookie might be added again.
if (currentreq != previousreq) {
requestFilters(currentreq);
}
} catch (IOException e) {
return failedFuture(e);
}
Exchange<T> exch = getExchange();
// 2. get response
cf = exch.responseAsync()
.thenCompose((Response response) -> {
HttpRequestImpl newrequest;
try {
// 3. apply response filters
newrequest = responseFilters(response);
} catch (IOException e) {
return failedFuture(e);
}
// 4. check filter result and repeat or continue
if (newrequest == null) {
if (attempts.get() > 1) {
Log.logError("Succeeded on attempt: " + attempts);
}
return completedFuture(response);
} else {
this.response =
new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
Exchange<T> oldExch = exch;
return exch.ignoreBody().handle((r,t) -> {
previousreq = currentreq;
currentreq = newrequest;
expiredOnce = false;
setExchange(new Exchange<>(currentreq, this, acc));
return responseAsyncImpl();
}).thenCompose(Function.identity());
} })
.handle((response, ex) -> {
// 5. handle errors and cancel any timer set
cancelTimer();
if (ex == null) {
assert response != null;
return completedFuture(response);
}
// all exceptions thrown are handled here
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
if (errorCF == null) {
return responseAsyncImpl();
} else {
return errorCF;
} })
.thenCompose(Function.identity());
}
return cf;
}
private static boolean retryPostValue() {
String s = Utils.getNetProperty("jdk.httpclient.enableAllMethodRetry");
if (s == null)
return false;
return s.isEmpty() ? true : Boolean.parseBoolean(s);
}
private static boolean disableRetryConnect() {
String s = Utils.getNetProperty("jdk.httpclient.disableRetryConnect");
if (s == null)
return false;
return s.isEmpty() ? true : Boolean.parseBoolean(s);
}
/** True if ALL ( even non-idempotent ) requests can be automatic retried. */
private static final boolean RETRY_ALWAYS = retryPostValue();
/** True if ConnectException should cause a retry. Enabled by default */
private static final boolean RETRY_CONNECT = !disableRetryConnect();
/** Returns true is given request has an idempotent method. */
private static boolean isIdempotentRequest(HttpRequest request) {
String method = request.method();
switch (method) {
case "GET" :
case "HEAD" :
return true;
default :
return false;
}
}
/** Returns true if the given request can be automatically retried. */
private static boolean canRetryRequest(HttpRequest request) {
if (RETRY_ALWAYS)
return true;
if (isIdempotentRequest(request))
return true;
return false;
}
private boolean retryOnFailure(Throwable t) {
return t instanceof ConnectionExpiredException
|| (RETRY_CONNECT && (t instanceof ConnectException));
}
private Throwable retryCause(Throwable t) {
Throwable cause = t instanceof ConnectionExpiredException ? t.getCause() : t;
return cause == null ? t : cause;
}
/**
* Takes a Throwable and returns a suitable CompletableFuture that is
* completed exceptionally, or null.
*/
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
if (t.getCause() != null) {
t = t.getCause();
}
}
if (cancelled && t instanceof IOException) {
if (!(t instanceof HttpTimeoutException)) {
t = toTimeoutException((IOException)t);
}
} else if (retryOnFailure(t)) {
Throwable cause = retryCause(t);
if (!(t instanceof ConnectException)) {
// we may need to start a new connection, and if so
// we want to start with a fresh connect timeout again.
if (connectTimeout != null) connectTimeout.reset();
if (!canRetryRequest(currentreq)) {
return failedFuture(cause); // fails with original cause
}
} // ConnectException: retry, but don't reset the connectTimeout.
// allow the retry mechanism to do its work
retryCause = cause;
if (!expiredOnce) {
if (debug.on())
debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
expiredOnce = true;
// The connection was abruptly closed.
// We return null to retry the same request a second time.
// The request filters have already been applied to the
// currentreq, so we set previousreq = currentreq to
// prevent them from being applied again.
previousreq = currentreq;
return null;
} else {
if (debug.on()) {
debug.log(t.getClass().getSimpleName()
+ " (async): already retried once.", t);
}
t = cause;
}
}
return failedFuture(t);
}
private HttpTimeoutException toTimeoutException(IOException ioe) {
HttpTimeoutException t = null;
// more specific, "request timed out", when connected
Exchange<?> exchange = getExchange();
if (exchange != null) {
ExchangeImpl<?> exchangeImpl = exchange.exchImpl;
if (exchangeImpl != null) {
if (exchangeImpl.connection().connected()) {
t = new HttpTimeoutException("request timed out");
t.initCause(ioe);
}
}
}
if (t == null) {
t = new HttpConnectTimeoutException("HTTP connect timed out");
t.initCause(new ConnectException("HTTP connect timed out"));
}
return t;
}
}