src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
author prappo
Thu, 09 Nov 2017 15:24:39 +0300
branchhttp-client-branch
changeset 55792 0936888d5a4a
parent 55764 34d7cc00f87a
child 55852 32f6aefec11e
permissions -rw-r--r--
http-client-branch: (cleanup) unused imports, unused methods, removed 1 class; typos;

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.lang.System.Logger.Level;
import java.time.Duration;
import java.util.List;
import java.security.AccessControlContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.ConnectionExpiredException;
import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;

/**
 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
 * - manages filters
 * - retries due to filters.
 * - I/O errors and most other exceptions get returned directly to user
 *
 * Creates a new Exchange for each request/response interaction
 */
class MultiExchange<U,T> {

    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    static final System.Logger DEBUG_LOGGER =
            Utils.getDebugLogger("MultiExchange"::toString, DEBUG);

    private final HttpRequest userRequest; // the user request
    private final HttpRequestImpl request; // a copy of the user request
    final AccessControlContext acc;
    final HttpClientImpl client;
    final HttpResponse.BodyHandler<T> responseHandler;
    final Executor executor;
    final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
    final AtomicInteger attempts = new AtomicInteger();
    HttpRequestImpl currentreq; // used for async only
    Exchange<T> exchange; // the current exchange
    Exchange<T> previous;
    volatile Throwable retryCause;
    volatile boolean expiredOnce;

    // Maximum number of times a request will be retried/redirected
    // for any reason

    static final int DEFAULT_MAX_ATTEMPTS = 5;
    static final int max_attempts = Utils.getIntegerNetProperty(
            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
    );

    private final List<HeaderFilter> filters;
    TimedEvent timedEvent;
    volatile boolean cancelled;
    final PushGroup<U,T> pushGroup;

    /**
     * Filter fields. These are attached as required by filters
     * and only used by the filter implementations. This could be
     * generalised into Objects that are passed explicitly to the filters
     * (one per MultiExchange object, and one per Exchange object possibly)
     */
    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
    // RedirectHandler
    volatile int numberOfRedirects = 0;

    /**
     * MultiExchange with one final response.
     */
    MultiExchange(HttpRequest userRequest,
                  HttpRequestImpl requestImpl,
                  HttpClientImpl client,
                  HttpResponse.BodyHandler<T> responseHandler,
                  AccessControlContext acc) {
        this.previous = null;
        this.userRequest = userRequest;
        this.request = requestImpl;
        this.currentreq = request;
        this.client = client;
        this.filters = client.filterChain();
        this.acc = acc;
        this.executor = client.theExecutor();
        this.responseHandler = responseHandler;
        if (acc != null) {
            // Restricts the file publisher with the senders ACC, if any
            if (responseHandler instanceof UntrustedBodyHandler)
                ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
        }
        this.exchange = new Exchange<>(request, this);
        this.multiResponseSubscriber = null;
        this.pushGroup = null;
    }

    /**
     * MultiExchange with multiple responses (HTTP/2 server pushes).
     */
    MultiExchange(HttpRequest userRequest,
                  HttpRequestImpl requestImpl,
                  HttpClientImpl client,
                  HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
                  AccessControlContext acc) {
        this.previous = null;
        this.userRequest = userRequest;
        this.request = requestImpl;
        this.currentreq = request;
        this.client = client;
        this.filters = client.filterChain();
        this.acc = acc;
        this.executor = client.theExecutor();
        this.multiResponseSubscriber = multiResponseSubscriber;
        this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
        this.exchange = new Exchange<>(request, this);
        this.responseHandler = pushGroup.mainResponseHandler();
    }

//    CompletableFuture<Void> multiCompletionCF() {
//        return pushGroup.groupResult();
//    }

    private synchronized Exchange<T> getExchange() {
        return exchange;
    }

    HttpClientImpl client() {
        return client;
    }

//    HttpClient.Redirect followRedirects() {
//        return client.followRedirects();
//    }

    HttpClient.Version version() {
        return request.version().orElse(client.version());
    }

    private synchronized void setExchange(Exchange<T> exchange) {
        if (this.exchange != null && exchange != this.exchange) {
            this.exchange.released();
        }
        this.exchange = exchange;
    }

    private void cancelTimer() {
        if (timedEvent != null) {
            client.cancelTimer(timedEvent);
        }
    }

    private void requestFilters(HttpRequestImpl r) throws IOException {
        Log.logTrace("Applying request filters");
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            filter.request(r, this);
        }
        Log.logTrace("All filters applied");
    }

    private HttpRequestImpl responseFilters(Response response) throws IOException
    {
        Log.logTrace("Applying response filters");
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            HttpRequestImpl newreq = filter.response(response);
            if (newreq != null) {
                Log.logTrace("New request: stopping filters");
                return newreq;
            }
        }
        Log.logTrace("All filters applied");
        return null;
    }

//    public void cancel() {
//        cancelled = true;
//        getExchange().cancel();
//    }

    public void cancel(IOException cause) {
        cancelled = true;
        getExchange().cancel(cause);
    }

    public CompletableFuture<HttpResponse<T>> responseAsync() {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    private CompletableFuture<HttpResponse<T>>
    responseAsync0(CompletableFuture<Void> start) {
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        Exchange<T> exch = getExchange();
                        return exch.readBodyAsync(responseHandler)
                                   .thenApply((T body) ->
                                           new HttpResponseImpl<>(userRequest,
                                                                  r,
                                                                  body,
                                                                  exch));
                    });
    }

    CompletableFuture<U> multiResponseAsync() {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
        CompletableFuture<HttpResponse<T>> mainResponse =
                cf.thenApply(b -> {
                        multiResponseSubscriber.onResponse(b);
                        pushGroup.noMorePushes(true);
                        return b; });
        pushGroup.setMainResponse(mainResponse);
        CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
                                                                      pushGroup.pushesCF());
        start.completeAsync( () -> null, executor); // trigger execution
        return res;
    }

    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
        if (attempts.incrementAndGet() > max_attempts) {
            cf = failedFuture(new IOException("Too many retries", retryCause));
        } else {
            if (currentreq.timeout().isPresent()) {
                timedEvent = new TimedEvent(currentreq.timeout().get());
                client.registerTimer(timedEvent);
            }
            try {
                // 1. apply request filters
                requestFilters(currentreq);
            } catch (IOException e) {
                return failedFuture(e);
            }
            Exchange<T> exch = getExchange();
            // 2. get response
            cf = exch.responseAsync()
                     .thenCompose((Response response) -> {
                        HttpRequestImpl newrequest;
                        try {
                            // 3. apply response filters
                            newrequest = responseFilters(response);
                        } catch (IOException e) {
                            return failedFuture(e);
                        }
                        // 4. check filter result and repeat or continue
                        if (newrequest == null) {
                            if (attempts.get() > 1) {
                                Log.logError("Succeeded on attempt: " + attempts);
                            }
                            return completedFuture(response);
                        } else {
                            currentreq = newrequest;
                            expiredOnce = false;
                            setExchange(new Exchange<>(currentreq, this, acc));
                            //reads body off previous, and then waits for next response
                            return responseAsyncImpl();
                        } })
                     .handle((response, ex) -> {
                        // 5. handle errors and cancel any timer set
                        cancelTimer();
                        if (ex == null) {
                            assert response != null;
                            return completedFuture(response);
                        }
                        // all exceptions thrown are handled here
                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
                        if (errorCF == null) {
                            return responseAsyncImpl();
                        } else {
                            return errorCF;
                        } })
                     .thenCompose(UnaryOperator.identity());
        }
        return cf;
    }

    /**
     * Takes a Throwable and returns a suitable CompletableFuture that is
     * completed exceptionally, or null.
     */
    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
            if (t.getCause() != null) {
                t = t.getCause();
            }
        }
        if (cancelled && t instanceof IOException) {
            t = new HttpTimeoutException("request timed out");
        } else if (t instanceof ConnectionExpiredException) {
            // allow the retry mechanism to do its work
            // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
            if (t.getCause() != null) retryCause = t.getCause();
            if (!expiredOnce) {
                DEBUG_LOGGER.log(Level.DEBUG,
                    "MultiExchange: ConnectionExpiredException (async): retrying...",
                    t);
                expiredOnce = true;
                return null;
            } else {
                DEBUG_LOGGER.log(Level.DEBUG,
                    "MultiExchange: ConnectionExpiredException (async): already retried once.",
                    t);
                if (t.getCause() != null) t = t.getCause();
            }
        }
        return failedFuture(t);
    }

    class TimedEvent extends TimeoutEvent {
        TimedEvent(Duration duration) {
            super(duration);
        }
        @Override
        public void handle() {
            DEBUG_LOGGER.log(Level.DEBUG,
                    "Cancelling MultiExchange due to timeout for request %s",
                     request);
            cancel(new HttpTimeoutException("request timed out"));
        }
    }
}