src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
author smarks
Mon, 04 Dec 2017 11:50:04 -0800
changeset 48059 6ee80cd217e0
parent 47216 71c04702a3d5
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8177290: add copy factory methods for unmodifiable List, Set, Map 8184690: add Collectors for collecting into unmodifiable List, Set, and Map Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;

import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.internal.common.Pair.pair;

/**
 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
 * - manages filters
 * - retries due to filters.
 * - I/O errors and most other exceptions get returned directly to user
 *
 * Creates a new Exchange for each request/response interaction
 */
class MultiExchange<U,T> {

    private final HttpRequest userRequest; // the user request
    private final HttpRequestImpl request; // a copy of the user request
    final AccessControlContext acc;
    final HttpClientImpl client;
    final HttpResponse.BodyHandler<T> responseHandler;
    final ExecutorWrapper execWrapper;
    final Executor executor;
    final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
    HttpRequestImpl currentreq; // used for async only
    Exchange<T> exchange; // the current exchange
    Exchange<T> previous;
    int attempts;
    // Maximum number of times a request will be retried/redirected
    // for any reason

    static final int DEFAULT_MAX_ATTEMPTS = 5;
    static final int max_attempts = Utils.getIntegerNetProperty(
            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
    );

    private final List<HeaderFilter> filters;
    TimedEvent timedEvent;
    volatile boolean cancelled;
    final PushGroup<U,T> pushGroup;

    /**
     * Filter fields. These are attached as required by filters
     * and only used by the filter implementations. This could be
     * generalised into Objects that are passed explicitly to the filters
     * (one per MultiExchange object, and one per Exchange object possibly)
     */
    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
    // RedirectHandler
    volatile int numberOfRedirects = 0;

    /**
     * MultiExchange with one final response.
     */
    MultiExchange(HttpRequest req,
                  HttpClientImpl client,
                  HttpResponse.BodyHandler<T> responseHandler) {
        this.previous = null;
        this.userRequest = req;
        this.request = new HttpRequestImpl(req);
        this.currentreq = request;
        this.attempts = 0;
        this.client = client;
        this.filters = client.filterChain();
        if (System.getSecurityManager() != null) {
            this.acc = AccessController.getContext();
        } else {
            this.acc = null;
        }
        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
        this.executor = execWrapper.executor();
        this.responseHandler = responseHandler;
        this.exchange = new Exchange<>(request, this);
        this.multiResponseHandler = null;
        this.pushGroup = null;
    }

    /**
     * MultiExchange with multiple responses (HTTP/2 server pushes).
     */
    MultiExchange(HttpRequest req,
                  HttpClientImpl client,
                  HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
        this.previous = null;
        this.userRequest = req;
        this.request = new HttpRequestImpl(req);
        this.currentreq = request;
        this.attempts = 0;
        this.client = client;
        this.filters = client.filterChain();
        if (System.getSecurityManager() != null) {
            this.acc = AccessController.getContext();
        } else {
            this.acc = null;
        }
        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
        this.executor = execWrapper.executor();
        this.multiResponseHandler = multiResponseHandler;
        this.pushGroup = new PushGroup<>(multiResponseHandler, request);
        this.exchange = new Exchange<>(request, this);
        this.responseHandler = pushGroup.mainResponseHandler();
    }

    public HttpResponseImpl<T> response() throws IOException, InterruptedException {
        HttpRequestImpl r = request;
        if (r.duration() != null) {
            timedEvent = new TimedEvent(r.duration());
            client.registerTimer(timedEvent);
        }
        while (attempts < max_attempts) {
            try {
                attempts++;
                Exchange<T> currExchange = getExchange();
                requestFilters(r);
                Response response = currExchange.response();
                HttpRequestImpl newreq = responseFilters(response);
                if (newreq == null) {
                    if (attempts > 1) {
                        Log.logError("Succeeded on attempt: " + attempts);
                    }
                    T body = currExchange.readBody(responseHandler);
                    cancelTimer();
                    return new HttpResponseImpl<>(userRequest, response, body, currExchange);
                }
                //response.body(HttpResponse.ignoreBody());
                setExchange(new Exchange<>(newreq, this, acc));
                r = newreq;
            } catch (IOException e) {
                if (cancelled) {
                    throw new HttpTimeoutException("Request timed out");
                }
                throw e;
            }
        }
        cancelTimer();
        throw new IOException("Retry limit exceeded");
    }

    CompletableFuture<Void> multiCompletionCF() {
        return pushGroup.groupResult();
    }

    private synchronized Exchange<T> getExchange() {
        return exchange;
    }

    HttpClientImpl client() {
        return client;
    }

    HttpClient.Redirect followRedirects() {
        return client.followRedirects();
    }

    HttpClient.Version version() {
        return request.version().orElse(client.version());
    }

    private synchronized void setExchange(Exchange<T> exchange) {
        this.exchange = exchange;
    }

    private void cancelTimer() {
        if (timedEvent != null) {
            client.cancelTimer(timedEvent);
        }
    }

    private void requestFilters(HttpRequestImpl r) throws IOException {
        Log.logTrace("Applying request filters");
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            filter.request(r, this);
        }
        Log.logTrace("All filters applied");
    }

    private HttpRequestImpl responseFilters(Response response) throws IOException
    {
        Log.logTrace("Applying response filters");
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            HttpRequestImpl newreq = filter.response(response);
            if (newreq != null) {
                Log.logTrace("New request: stopping filters");
                return newreq;
            }
        }
        Log.logTrace("All filters applied");
        return null;
    }

    public void cancel() {
        cancelled = true;
        getExchange().cancel();
    }

    public void cancel(IOException cause) {
        cancelled = true;
        getExchange().cancel(cause);
    }

    public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
        return start.thenCompose( v -> responseAsyncImpl())
            .thenCompose((Response r) -> {
                Exchange<T> exch = getExchange();
                return exch.readBodyAsync(responseHandler)
                        .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
            });
    }

    CompletableFuture<U> multiResponseAsync() {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
        CompletableFuture<HttpResponse<T>> mainResponse =
                cf.thenApply((HttpResponseImpl<T> b) -> {
                      multiResponseHandler.onResponse(b);
                      return (HttpResponse<T>)b;
                   });

        pushGroup.setMainResponse(mainResponse);
        // set up house-keeping related to multi-response
        mainResponse.thenAccept((r) -> {
            // All push promises received by now.
            pushGroup.noMorePushes(true);
        });
        CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
        start.completeAsync( () -> null, executor); // trigger execution
        return res;
    }

    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
        if (++attempts > max_attempts) {
            cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
        } else {
            if (currentreq.duration() != null) {
                timedEvent = new TimedEvent(currentreq.duration());
                client.registerTimer(timedEvent);
            }
            try {
                // 1. Apply request filters
                requestFilters(currentreq);
            } catch (IOException e) {
                return MinimalFuture.failedFuture(e);
            }
            Exchange<T> exch = getExchange();
            // 2. get response
            cf = exch.responseAsync()
                .thenCompose((Response response) -> {
                    HttpRequestImpl newrequest = null;
                    try {
                        // 3. Apply response filters
                        newrequest = responseFilters(response);
                    } catch (IOException e) {
                        return MinimalFuture.failedFuture(e);
                    }
                    // 4. Check filter result and repeat or continue
                    if (newrequest == null) {
                        if (attempts > 1) {
                            Log.logError("Succeeded on attempt: " + attempts);
                        }
                        return MinimalFuture.completedFuture(response);
                    } else {
                        currentreq = newrequest;
                        setExchange(new Exchange<>(currentreq, this, acc));
                        //reads body off previous, and then waits for next response
                        return responseAsyncImpl();
                    }
                })
            // 5. Handle errors and cancel any timer set
            .handle((response, ex) -> {
                cancelTimer();
                if (ex == null) {
                    assert response != null;
                    return MinimalFuture.completedFuture(response);
                }
                // all exceptions thrown are handled here
                CompletableFuture<Response> error = getExceptionalCF(ex);
                if (error == null) {
                    return responseAsyncImpl();
                } else {
                    return error;
                }
            })
            .thenCompose(UnaryOperator.identity());
        }
        return cf;
    }

    /**
     * Take a Throwable and return a suitable CompletableFuture that is
     * completed exceptionally.
     */
    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
            if (t.getCause() != null) {
                t = t.getCause();
            }
        }
        if (cancelled && t instanceof IOException) {
            t = new HttpTimeoutException("request timed out");
        }
        return MinimalFuture.failedFuture(t);
    }

    class TimedEvent extends TimeoutEvent {
        TimedEvent(Duration duration) {
            super(duration);
        }
        @Override
        public void handle() {
            cancel(new HttpTimeoutException("request timed out"));
        }
    }
}