src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java
author dfuchs
Thu, 24 Jan 2019 12:32:42 +0000
changeset 53467 97cf88608d76
parent 53387 c9622e15ba29
child 53701 e57bcfd7bf79
permissions -rw-r--r--
8217264: HttpClient: Blocking operations in mapper function do not work as documented Summary: ensures that a new task is spawned when calling getBody() on a mapping BodySubscriber. Reviewed-by: chegar

/*
 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.IOException;
import java.net.ConnectException;
import java.net.http.HttpConnectTimeoutException;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedList;
import java.security.AccessControlContext;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.HttpTimeoutException;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.ConnectionExpiredException;
import jdk.internal.net.http.common.Utils;
import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;

/**
 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
 * - manages filters
 * - retries due to filters.
 * - I/O errors and most other exceptions get returned directly to user
 *
 * Creates a new Exchange for each request/response interaction
 */
class MultiExchange<T> {

    static final Logger debug =
            Utils.getDebugLogger("MultiExchange"::toString, Utils.DEBUG);

    private final HttpRequest userRequest; // the user request
    private final HttpRequestImpl request; // a copy of the user request
    private final ConnectTimeoutTracker connectTimeout; // null if no timeout
    final AccessControlContext acc;
    final HttpClientImpl client;
    final HttpResponse.BodyHandler<T> responseHandler;
    final HttpClientImpl.DelegatingExecutor executor;
    final AtomicInteger attempts = new AtomicInteger();
    HttpRequestImpl currentreq; // used for retries & redirect
    HttpRequestImpl previousreq; // used for retries & redirect
    Exchange<T> exchange; // the current exchange
    Exchange<T> previous;
    volatile Throwable retryCause;
    volatile boolean expiredOnce;
    volatile HttpResponse<T> response = null;

    // Maximum number of times a request will be retried/redirected
    // for any reason

    static final int DEFAULT_MAX_ATTEMPTS = 5;
    static final int max_attempts = Utils.getIntegerNetProperty(
            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
    );

    private final LinkedList<HeaderFilter> filters;
    ResponseTimerEvent responseTimerEvent;
    volatile boolean cancelled;
    final PushGroup<T> pushGroup;

    /**
     * Filter fields. These are attached as required by filters
     * and only used by the filter implementations. This could be
     * generalised into Objects that are passed explicitly to the filters
     * (one per MultiExchange object, and one per Exchange object possibly)
     */
    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
    // RedirectHandler
    volatile int numberOfRedirects = 0;

    // This class is used to keep track of the connection timeout
    // across retries, when a ConnectException causes a retry.
    // In that case - we will retry the connect, but we don't
    // want to double the timeout by starting a new timer with
    // the full connectTimeout again.
    // Instead we use the ConnectTimeoutTracker to return a new
    // duration that takes into account the time spent in the
    // first connect attempt.
    // If however, the connection gets connected, but we later
    // retry the whole operation, then we reset the timer before
    // retrying (since the connection used for the second request
    // will not necessarily be the same: it could be a new
    // unconnected connection) - see getExceptionalCF().
    private static final class ConnectTimeoutTracker {
        final Duration max;
        final AtomicLong startTime = new AtomicLong();
        ConnectTimeoutTracker(Duration connectTimeout) {
            this.max = Objects.requireNonNull(connectTimeout);
        }

        Duration getRemaining() {
            long now = System.nanoTime();
            long previous = startTime.compareAndExchange(0, now);
            if (previous == 0 || max.isZero()) return max;
            Duration remaining = max.minus(Duration.ofNanos(now - previous));
            assert remaining.compareTo(max) <= 0;
            return remaining.isNegative() ? Duration.ZERO : remaining;
        }

        void reset() { startTime.set(0); }
    }

    /**
     * MultiExchange with one final response.
     */
    MultiExchange(HttpRequest userRequest,
                  HttpRequestImpl requestImpl,
                  HttpClientImpl client,
                  HttpResponse.BodyHandler<T> responseHandler,
                  PushPromiseHandler<T> pushPromiseHandler,
                  AccessControlContext acc) {
        this.previous = null;
        this.userRequest = userRequest;
        this.request = requestImpl;
        this.currentreq = request;
        this.previousreq = null;
        this.client = client;
        this.filters = client.filterChain();
        this.acc = acc;
        this.executor = client.theExecutor();
        this.responseHandler = responseHandler;

        if (pushPromiseHandler != null) {
            Executor executor = acc == null
                    ? this.executor.delegate()
                    : new PrivilegedExecutor(this.executor.delegate(), acc);
            this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
        } else {
            pushGroup = null;
        }
        this.connectTimeout = client.connectTimeout()
                .map(ConnectTimeoutTracker::new).orElse(null);
        this.exchange = new Exchange<>(request, this);
    }

    synchronized Exchange<T> getExchange() {
        return exchange;
    }

    HttpClientImpl client() {
        return client;
    }

    HttpClient.Version version() {
        HttpClient.Version vers = request.version().orElse(client.version());
        if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
            vers = HttpClient.Version.HTTP_1_1;
        return vers;
    }

    private synchronized void setExchange(Exchange<T> exchange) {
        if (this.exchange != null && exchange != this.exchange) {
            this.exchange.released();
        }
        this.exchange = exchange;
    }

    public Optional<Duration> remainingConnectTimeout() {
        return Optional.ofNullable(connectTimeout)
                .map(ConnectTimeoutTracker::getRemaining);
    }

    private void cancelTimer() {
        if (responseTimerEvent != null) {
            client.cancelTimer(responseTimerEvent);
        }
    }

    private void requestFilters(HttpRequestImpl r) throws IOException {
        Log.logTrace("Applying request filters");
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            filter.request(r, this);
        }
        Log.logTrace("All filters applied");
    }

    private HttpRequestImpl responseFilters(Response response) throws IOException
    {
        Log.logTrace("Applying response filters");
        Iterator<HeaderFilter> reverseItr = filters.descendingIterator();
        while (reverseItr.hasNext()) {
            HeaderFilter filter = reverseItr.next();
            Log.logTrace("Applying {0}", filter);
            HttpRequestImpl newreq = filter.response(response);
            if (newreq != null) {
                Log.logTrace("New request: stopping filters");
                return newreq;
            }
        }
        Log.logTrace("All filters applied");
        return null;
    }

    public void cancel(IOException cause) {
        cancelled = true;
        getExchange().cancel(cause);
    }

    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    // return true if the response is a type where a response body is never possible
    // and therefore doesn't have to include header information which indicates no
    // body is present. This is distinct from responses that also do not contain
    // response bodies (possibly ever) but which are required to have content length
    // info in the header (eg 205). Those cases do not have to be handled specially

    private static boolean bodyNotPermitted(Response r) {
        return r.statusCode == 204;
    }

    private boolean bodyIsPresent(Response r) {
        HttpHeaders headers = r.headers();
        if (headers.firstValue("Content-length").isPresent())
            return true;
        if (headers.firstValue("Transfer-encoding").isPresent())
            return true;
        return false;
    }

    // Call the user's body handler to get an empty body object

    private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T> exch) {
        BodySubscriber<T> bs = responseHandler.apply(new ResponseInfoImpl(r.statusCode(),
                r.headers(), r.version()));
        bs.onSubscribe(new NullSubscription());
        bs.onComplete();
        CompletionStage<T> cs = ResponseSubscribers.getBodyAsync(executor, bs);
        MinimalFuture<HttpResponse<T>> result = new MinimalFuture<>();
        cs.whenComplete((nullBody, exception) -> {
            if (exception != null)
                result.completeExceptionally(exception);
            else {
                this.response =
                        new HttpResponseImpl<>(r.request(), r, this.response, nullBody, exch);
                result.complete(this.response);
            }
        });
        // ensure that the connection is closed or returned to the pool.
        return result.whenComplete(exch::nullBody);
    }

    private CompletableFuture<HttpResponse<T>>
    responseAsync0(CompletableFuture<Void> start) {
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        Exchange<T> exch = getExchange();
                        if (bodyNotPermitted(r)) {
                            if (bodyIsPresent(r)) {
                                IOException ioe = new IOException(
                                    "unexpected content length header with 204 response");
                                exch.cancel(ioe);
                                return MinimalFuture.failedFuture(ioe);
                            } else
                                return handleNoBody(r, exch);
                        }
                        return exch.readBodyAsync(responseHandler)
                            .thenApply((T body) -> {
                                this.response =
                                    new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                return this.response;
                            });
                    });
    }

    static class NullSubscription implements Flow.Subscription {
        @Override
        public void request(long n) {
        }

        @Override
        public void cancel() {
        }
    }

    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
        if (attempts.incrementAndGet() > max_attempts) {
            cf = failedFuture(new IOException("Too many retries", retryCause));
        } else {
            if (currentreq.timeout().isPresent()) {
                responseTimerEvent = ResponseTimerEvent.of(this);
                client.registerTimer(responseTimerEvent);
            }
            try {
                // 1. apply request filters
                // if currentreq == previousreq the filters have already
                // been applied once. Applying them a second time might
                // cause some headers values to be added twice: for
                // instance, the same cookie might be added again.
                if (currentreq != previousreq) {
                    requestFilters(currentreq);
                }
            } catch (IOException e) {
                return failedFuture(e);
            }
            Exchange<T> exch = getExchange();
            // 2. get response
            cf = exch.responseAsync()
                     .thenCompose((Response response) -> {
                        HttpRequestImpl newrequest;
                        try {
                            // 3. apply response filters
                            newrequest = responseFilters(response);
                        } catch (IOException e) {
                            return failedFuture(e);
                        }
                        // 4. check filter result and repeat or continue
                        if (newrequest == null) {
                            if (attempts.get() > 1) {
                                Log.logError("Succeeded on attempt: " + attempts);
                            }
                            return completedFuture(response);
                        } else {
                            this.response =
                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
                            Exchange<T> oldExch = exch;
                            return exch.ignoreBody().handle((r,t) -> {
                                previousreq = currentreq;
                                currentreq = newrequest;
                                expiredOnce = false;
                                setExchange(new Exchange<>(currentreq, this, acc));
                                return responseAsyncImpl();
                            }).thenCompose(Function.identity());
                        } })
                     .handle((response, ex) -> {
                        // 5. handle errors and cancel any timer set
                        cancelTimer();
                        if (ex == null) {
                            assert response != null;
                            return completedFuture(response);
                        }
                        // all exceptions thrown are handled here
                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
                        if (errorCF == null) {
                            return responseAsyncImpl();
                        } else {
                            return errorCF;
                        } })
                     .thenCompose(Function.identity());
        }
        return cf;
    }

    private static boolean retryPostValue() {
        String s = Utils.getNetProperty("jdk.httpclient.enableAllMethodRetry");
        if (s == null)
            return false;
        return s.isEmpty() ? true : Boolean.parseBoolean(s);
    }

    private static boolean disableRetryConnect() {
        String s = Utils.getNetProperty("jdk.httpclient.disableRetryConnect");
        if (s == null)
            return false;
        return s.isEmpty() ? true : Boolean.parseBoolean(s);
    }

    /** True if ALL ( even non-idempotent ) requests can be automatic retried. */
    private static final boolean RETRY_ALWAYS = retryPostValue();
    /** True if ConnectException should cause a retry. Enabled by default */
    private static final boolean RETRY_CONNECT = !disableRetryConnect();

    /** Returns true is given request has an idempotent method. */
    private static boolean isIdempotentRequest(HttpRequest request) {
        String method = request.method();
        switch (method) {
            case "GET" :
            case "HEAD" :
                return true;
            default :
                return false;
        }
    }

    /** Returns true if the given request can be automatically retried. */
    private static boolean canRetryRequest(HttpRequest request) {
        if (RETRY_ALWAYS)
            return true;
        if (isIdempotentRequest(request))
            return true;
        return false;
    }

    private boolean retryOnFailure(Throwable t) {
        return t instanceof ConnectionExpiredException
                || (RETRY_CONNECT && (t instanceof ConnectException));
    }

    private Throwable retryCause(Throwable t) {
        Throwable cause = t instanceof ConnectionExpiredException ? t.getCause() : t;
        return cause == null ? t : cause;
    }

    /**
     * Takes a Throwable and returns a suitable CompletableFuture that is
     * completed exceptionally, or null.
     */
    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
            if (t.getCause() != null) {
                t = t.getCause();
            }
        }
        if (cancelled && t instanceof IOException) {
            if (!(t instanceof HttpTimeoutException)) {
                t = toTimeoutException((IOException)t);
            }
        } else if (retryOnFailure(t)) {
            Throwable cause = retryCause(t);

            if (!(t instanceof ConnectException)) {
                // we may need to start a new connection, and if so
                // we want to start with a fresh connect timeout again.
                if (connectTimeout != null) connectTimeout.reset();
                if (!canRetryRequest(currentreq)) {
                    return failedFuture(cause); // fails with original cause
                }
            } // ConnectException: retry, but don't reset the connectTimeout.

            // allow the retry mechanism to do its work
            retryCause = cause;
            if (!expiredOnce) {
                if (debug.on())
                    debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
                expiredOnce = true;
                // The connection was abruptly closed.
                // We return null to retry the same request a second time.
                // The request filters have already been applied to the
                // currentreq, so we set previousreq = currentreq to
                // prevent them from being applied again.
                previousreq = currentreq;
                return null;
            } else {
                if (debug.on()) {
                    debug.log(t.getClass().getSimpleName()
                            + " (async): already retried once.", t);
                }
                t = cause;
            }
        }
        return failedFuture(t);
    }

    private HttpTimeoutException toTimeoutException(IOException ioe) {
        HttpTimeoutException t = null;

        // more specific, "request timed out", when connected
        Exchange<?> exchange = getExchange();
        if (exchange != null) {
            ExchangeImpl<?> exchangeImpl = exchange.exchImpl;
            if (exchangeImpl != null) {
                if (exchangeImpl.connection().connected()) {
                    t = new HttpTimeoutException("request timed out");
                    t.initCause(ioe);
                }
            }
        }
        if (t == null) {
            t = new HttpConnectTimeoutException("HTTP connect timed out");
            t.initCause(new ConnectException("HTTP connect timed out"));
        }
        return t;
    }
}