src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56043 08e8e41841cf
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/MultiExchange.java	Tue Feb 06 14:10:28 2018 +0000
@@ -0,0 +1,326 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal;
+
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.time.Duration;
+import java.util.List;
+import java.security.AccessControlContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
+import jdk.incubator.http.HttpTimeoutException;
+import jdk.incubator.http.internal.UntrustedBodyHandler;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.ConnectionExpiredException;
+import jdk.incubator.http.internal.common.Utils;
+import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
+import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
+
+/**
+ * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
+ * - manages filters
+ * - retries due to filters.
+ * - I/O errors and most other exceptions get returned directly to user
+ *
+ * Creates a new Exchange for each request/response interaction
+ */
+class MultiExchange<T> {
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    static final System.Logger DEBUG_LOGGER =
+            Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
+
+    private final HttpRequest userRequest; // the user request
+    private final HttpRequestImpl request; // a copy of the user request
+    final AccessControlContext acc;
+    final HttpClientImpl client;
+    final HttpResponse.BodyHandler<T> responseHandler;
+    final Executor executor;
+    final AtomicInteger attempts = new AtomicInteger();
+    HttpRequestImpl currentreq; // used for async only
+    Exchange<T> exchange; // the current exchange
+    Exchange<T> previous;
+    volatile Throwable retryCause;
+    volatile boolean expiredOnce;
+    volatile HttpResponse<T> response = null;
+
+    // Maximum number of times a request will be retried/redirected
+    // for any reason
+
+    static final int DEFAULT_MAX_ATTEMPTS = 5;
+    static final int max_attempts = Utils.getIntegerNetProperty(
+            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
+    );
+
+    private final List<HeaderFilter> filters;
+    TimedEvent timedEvent;
+    volatile boolean cancelled;
+    final PushGroup<T> pushGroup;
+
+    /**
+     * Filter fields. These are attached as required by filters
+     * and only used by the filter implementations. This could be
+     * generalised into Objects that are passed explicitly to the filters
+     * (one per MultiExchange object, and one per Exchange object possibly)
+     */
+    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
+    // RedirectHandler
+    volatile int numberOfRedirects = 0;
+
+    /**
+     * MultiExchange with one final response.
+     */
+    MultiExchange(HttpRequest userRequest,
+                  HttpRequestImpl requestImpl,
+                  HttpClientImpl client,
+                  HttpResponse.BodyHandler<T> responseHandler,
+                  PushPromiseHandler<T> pushPromiseHandler,
+                  AccessControlContext acc) {
+        this.previous = null;
+        this.userRequest = userRequest;
+        this.request = requestImpl;
+        this.currentreq = request;
+        this.client = client;
+        this.filters = client.filterChain();
+        this.acc = acc;
+        this.executor = client.theExecutor();
+        this.responseHandler = responseHandler;
+        if (acc != null) {
+            // Restricts the file publisher with the senders ACC, if any
+            if (responseHandler instanceof UntrustedBodyHandler)
+                ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
+        }
+
+        if (pushPromiseHandler != null) {
+            this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
+        } else {
+            pushGroup = null;
+        }
+
+        this.exchange = new Exchange<>(request, this);
+    }
+
+    private synchronized Exchange<T> getExchange() {
+        return exchange;
+    }
+
+    HttpClientImpl client() {
+        return client;
+    }
+
+    HttpClient.Version version() {
+        HttpClient.Version vers = request.version().orElse(client.version());
+        if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
+            vers = HttpClient.Version.HTTP_1_1;
+        return vers;
+    }
+
+    private synchronized void setExchange(Exchange<T> exchange) {
+        if (this.exchange != null && exchange != this.exchange) {
+            this.exchange.released();
+        }
+        this.exchange = exchange;
+    }
+
+    private void cancelTimer() {
+        if (timedEvent != null) {
+            client.cancelTimer(timedEvent);
+        }
+    }
+
+    private void requestFilters(HttpRequestImpl r) throws IOException {
+        Log.logTrace("Applying request filters");
+        for (HeaderFilter filter : filters) {
+            Log.logTrace("Applying {0}", filter);
+            filter.request(r, this);
+        }
+        Log.logTrace("All filters applied");
+    }
+
+    private HttpRequestImpl responseFilters(Response response) throws IOException
+    {
+        Log.logTrace("Applying response filters");
+        for (HeaderFilter filter : filters) {
+            Log.logTrace("Applying {0}", filter);
+            HttpRequestImpl newreq = filter.response(response);
+            if (newreq != null) {
+                Log.logTrace("New request: stopping filters");
+                return newreq;
+            }
+        }
+        Log.logTrace("All filters applied");
+        return null;
+    }
+
+//    public void cancel() {
+//        cancelled = true;
+//        getExchange().cancel();
+//    }
+
+    public void cancel(IOException cause) {
+        cancelled = true;
+        getExchange().cancel(cause);
+    }
+
+    public CompletableFuture<HttpResponse<T>> responseAsync() {
+        CompletableFuture<Void> start = new MinimalFuture<>();
+        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
+        start.completeAsync( () -> null, executor); // trigger execution
+        return cf;
+    }
+
+    private CompletableFuture<HttpResponse<T>>
+    responseAsync0(CompletableFuture<Void> start) {
+        return start.thenCompose( v -> responseAsyncImpl())
+                    .thenCompose((Response r) -> {
+                        Exchange<T> exch = getExchange();
+                        return exch.readBodyAsync(responseHandler)
+                            .thenApply((T body) -> {
+                                this.response =
+                                    new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
+                                return this.response;
+                            });
+                    });
+    }
+
+    private CompletableFuture<Response> responseAsyncImpl() {
+        CompletableFuture<Response> cf;
+        if (attempts.incrementAndGet() > max_attempts) {
+            cf = failedFuture(new IOException("Too many retries", retryCause));
+        } else {
+            if (currentreq.timeout().isPresent()) {
+                timedEvent = new TimedEvent(currentreq.timeout().get());
+                client.registerTimer(timedEvent);
+            }
+            try {
+                // 1. apply request filters
+                requestFilters(currentreq);
+            } catch (IOException e) {
+                return failedFuture(e);
+            }
+            Exchange<T> exch = getExchange();
+            // 2. get response
+            cf = exch.responseAsync()
+                     .thenCompose((Response response) -> {
+                        HttpRequestImpl newrequest;
+                        try {
+                            // 3. apply response filters
+                            newrequest = responseFilters(response);
+                        } catch (IOException e) {
+                            return failedFuture(e);
+                        }
+                        // 4. check filter result and repeat or continue
+                        if (newrequest == null) {
+                            if (attempts.get() > 1) {
+                                Log.logError("Succeeded on attempt: " + attempts);
+                            }
+                            return completedFuture(response);
+                        } else {
+                            this.response =
+                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
+                            Exchange<T> oldExch = exch;
+                            return exch.ignoreBody().handle((r,t) -> {
+                                currentreq = newrequest;
+                                expiredOnce = false;
+                                setExchange(new Exchange<>(currentreq, this, acc));
+                                return responseAsyncImpl();
+                            }).thenCompose(Function.identity());
+                        } })
+                     .handle((response, ex) -> {
+                        // 5. handle errors and cancel any timer set
+                        cancelTimer();
+                        if (ex == null) {
+                            assert response != null;
+                            return completedFuture(response);
+                        }
+                        // all exceptions thrown are handled here
+                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
+                        if (errorCF == null) {
+                            return responseAsyncImpl();
+                        } else {
+                            return errorCF;
+                        } })
+                     .thenCompose(Function.identity());
+        }
+        return cf;
+    }
+
+    /**
+     * Takes a Throwable and returns a suitable CompletableFuture that is
+     * completed exceptionally, or null.
+     */
+    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
+        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
+            if (t.getCause() != null) {
+                t = t.getCause();
+            }
+        }
+        if (cancelled && t instanceof IOException) {
+            t = new HttpTimeoutException("request timed out");
+        } else if (t instanceof ConnectionExpiredException) {
+            // allow the retry mechanism to do its work
+            // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
+            if (t.getCause() != null) retryCause = t.getCause();
+            if (!expiredOnce) {
+                DEBUG_LOGGER.log(Level.DEBUG,
+                    "MultiExchange: ConnectionExpiredException (async): retrying...",
+                    t);
+                expiredOnce = true;
+                return null;
+            } else {
+                DEBUG_LOGGER.log(Level.DEBUG,
+                    "MultiExchange: ConnectionExpiredException (async): already retried once.",
+                    t);
+                if (t.getCause() != null) t = t.getCause();
+            }
+        }
+        return failedFuture(t);
+    }
+
+    class TimedEvent extends TimeoutEvent {
+        TimedEvent(Duration duration) {
+            super(duration);
+        }
+        @Override
+        public void handle() {
+            DEBUG_LOGGER.log(Level.DEBUG,
+                    "Cancelling MultiExchange due to timeout for request %s",
+                     request);
+            cancel(new HttpTimeoutException("request timed out"));
+        }
+    }
+}