src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java	Tue Feb 06 19:37:56 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,326 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal;
-
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.time.Duration;
-import java.util.List;
-import java.security.AccessControlContext;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-
-import jdk.incubator.http.HttpClient;
-import jdk.incubator.http.HttpRequest;
-import jdk.incubator.http.HttpResponse;
-import jdk.incubator.http.HttpResponse.PushPromiseHandler;
-import jdk.incubator.http.HttpTimeoutException;
-import jdk.incubator.http.internal.UntrustedBodyHandler;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.ConnectionExpiredException;
-import jdk.incubator.http.internal.common.Utils;
-import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
-import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
-
-/**
- * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
- * - manages filters
- * - retries due to filters.
- * - I/O errors and most other exceptions get returned directly to user
- *
- * Creates a new Exchange for each request/response interaction
- */
-class MultiExchange<T> {
-
-    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
-    static final System.Logger DEBUG_LOGGER =
-            Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
-
-    private final HttpRequest userRequest; // the user request
-    private final HttpRequestImpl request; // a copy of the user request
-    final AccessControlContext acc;
-    final HttpClientImpl client;
-    final HttpResponse.BodyHandler<T> responseHandler;
-    final Executor executor;
-    final AtomicInteger attempts = new AtomicInteger();
-    HttpRequestImpl currentreq; // used for async only
-    Exchange<T> exchange; // the current exchange
-    Exchange<T> previous;
-    volatile Throwable retryCause;
-    volatile boolean expiredOnce;
-    volatile HttpResponse<T> response = null;
-
-    // Maximum number of times a request will be retried/redirected
-    // for any reason
-
-    static final int DEFAULT_MAX_ATTEMPTS = 5;
-    static final int max_attempts = Utils.getIntegerNetProperty(
-            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
-    );
-
-    private final List<HeaderFilter> filters;
-    TimedEvent timedEvent;
-    volatile boolean cancelled;
-    final PushGroup<T> pushGroup;
-
-    /**
-     * Filter fields. These are attached as required by filters
-     * and only used by the filter implementations. This could be
-     * generalised into Objects that are passed explicitly to the filters
-     * (one per MultiExchange object, and one per Exchange object possibly)
-     */
-    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
-    // RedirectHandler
-    volatile int numberOfRedirects = 0;
-
-    /**
-     * MultiExchange with one final response.
-     */
-    MultiExchange(HttpRequest userRequest,
-                  HttpRequestImpl requestImpl,
-                  HttpClientImpl client,
-                  HttpResponse.BodyHandler<T> responseHandler,
-                  PushPromiseHandler<T> pushPromiseHandler,
-                  AccessControlContext acc) {
-        this.previous = null;
-        this.userRequest = userRequest;
-        this.request = requestImpl;
-        this.currentreq = request;
-        this.client = client;
-        this.filters = client.filterChain();
-        this.acc = acc;
-        this.executor = client.theExecutor();
-        this.responseHandler = responseHandler;
-        if (acc != null) {
-            // Restricts the file publisher with the senders ACC, if any
-            if (responseHandler instanceof UntrustedBodyHandler)
-                ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
-        }
-
-        if (pushPromiseHandler != null) {
-            this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
-        } else {
-            pushGroup = null;
-        }
-
-        this.exchange = new Exchange<>(request, this);
-    }
-
-    private synchronized Exchange<T> getExchange() {
-        return exchange;
-    }
-
-    HttpClientImpl client() {
-        return client;
-    }
-
-    HttpClient.Version version() {
-        HttpClient.Version vers = request.version().orElse(client.version());
-        if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
-            vers = HttpClient.Version.HTTP_1_1;
-        return vers;
-    }
-
-    private synchronized void setExchange(Exchange<T> exchange) {
-        if (this.exchange != null && exchange != this.exchange) {
-            this.exchange.released();
-        }
-        this.exchange = exchange;
-    }
-
-    private void cancelTimer() {
-        if (timedEvent != null) {
-            client.cancelTimer(timedEvent);
-        }
-    }
-
-    private void requestFilters(HttpRequestImpl r) throws IOException {
-        Log.logTrace("Applying request filters");
-        for (HeaderFilter filter : filters) {
-            Log.logTrace("Applying {0}", filter);
-            filter.request(r, this);
-        }
-        Log.logTrace("All filters applied");
-    }
-
-    private HttpRequestImpl responseFilters(Response response) throws IOException
-    {
-        Log.logTrace("Applying response filters");
-        for (HeaderFilter filter : filters) {
-            Log.logTrace("Applying {0}", filter);
-            HttpRequestImpl newreq = filter.response(response);
-            if (newreq != null) {
-                Log.logTrace("New request: stopping filters");
-                return newreq;
-            }
-        }
-        Log.logTrace("All filters applied");
-        return null;
-    }
-
-//    public void cancel() {
-//        cancelled = true;
-//        getExchange().cancel();
-//    }
-
-    public void cancel(IOException cause) {
-        cancelled = true;
-        getExchange().cancel(cause);
-    }
-
-    public CompletableFuture<HttpResponse<T>> responseAsync() {
-        CompletableFuture<Void> start = new MinimalFuture<>();
-        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
-        start.completeAsync( () -> null, executor); // trigger execution
-        return cf;
-    }
-
-    private CompletableFuture<HttpResponse<T>>
-    responseAsync0(CompletableFuture<Void> start) {
-        return start.thenCompose( v -> responseAsyncImpl())
-                    .thenCompose((Response r) -> {
-                        Exchange<T> exch = getExchange();
-                        return exch.readBodyAsync(responseHandler)
-                            .thenApply((T body) -> {
-                                this.response =
-                                    new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
-                                return this.response;
-                            });
-                    });
-    }
-
-    private CompletableFuture<Response> responseAsyncImpl() {
-        CompletableFuture<Response> cf;
-        if (attempts.incrementAndGet() > max_attempts) {
-            cf = failedFuture(new IOException("Too many retries", retryCause));
-        } else {
-            if (currentreq.timeout().isPresent()) {
-                timedEvent = new TimedEvent(currentreq.timeout().get());
-                client.registerTimer(timedEvent);
-            }
-            try {
-                // 1. apply request filters
-                requestFilters(currentreq);
-            } catch (IOException e) {
-                return failedFuture(e);
-            }
-            Exchange<T> exch = getExchange();
-            // 2. get response
-            cf = exch.responseAsync()
-                     .thenCompose((Response response) -> {
-                        HttpRequestImpl newrequest;
-                        try {
-                            // 3. apply response filters
-                            newrequest = responseFilters(response);
-                        } catch (IOException e) {
-                            return failedFuture(e);
-                        }
-                        // 4. check filter result and repeat or continue
-                        if (newrequest == null) {
-                            if (attempts.get() > 1) {
-                                Log.logError("Succeeded on attempt: " + attempts);
-                            }
-                            return completedFuture(response);
-                        } else {
-                            this.response =
-                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
-                            Exchange<T> oldExch = exch;
-                            return exch.ignoreBody().handle((r,t) -> {
-                                currentreq = newrequest;
-                                expiredOnce = false;
-                                setExchange(new Exchange<>(currentreq, this, acc));
-                                return responseAsyncImpl();
-                            }).thenCompose(Function.identity());
-                        } })
-                     .handle((response, ex) -> {
-                        // 5. handle errors and cancel any timer set
-                        cancelTimer();
-                        if (ex == null) {
-                            assert response != null;
-                            return completedFuture(response);
-                        }
-                        // all exceptions thrown are handled here
-                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
-                        if (errorCF == null) {
-                            return responseAsyncImpl();
-                        } else {
-                            return errorCF;
-                        } })
-                     .thenCompose(Function.identity());
-        }
-        return cf;
-    }
-
-    /**
-     * Takes a Throwable and returns a suitable CompletableFuture that is
-     * completed exceptionally, or null.
-     */
-    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
-        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
-            if (t.getCause() != null) {
-                t = t.getCause();
-            }
-        }
-        if (cancelled && t instanceof IOException) {
-            t = new HttpTimeoutException("request timed out");
-        } else if (t instanceof ConnectionExpiredException) {
-            // allow the retry mechanism to do its work
-            // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
-            if (t.getCause() != null) retryCause = t.getCause();
-            if (!expiredOnce) {
-                DEBUG_LOGGER.log(Level.DEBUG,
-                    "MultiExchange: ConnectionExpiredException (async): retrying...",
-                    t);
-                expiredOnce = true;
-                return null;
-            } else {
-                DEBUG_LOGGER.log(Level.DEBUG,
-                    "MultiExchange: ConnectionExpiredException (async): already retried once.",
-                    t);
-                if (t.getCause() != null) t = t.getCause();
-            }
-        }
-        return failedFuture(t);
-    }
-
-    class TimedEvent extends TimeoutEvent {
-        TimedEvent(Duration duration) {
-            super(duration);
-        }
-        @Override
-        public void handle() {
-            DEBUG_LOGGER.log(Level.DEBUG,
-                    "Cancelling MultiExchange due to timeout for request %s",
-                     request);
-            cancel(new HttpTimeoutException("request timed out"));
-        }
-    }
-}