src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
author smarks
Mon, 04 Dec 2017 11:50:04 -0800
changeset 48059 6ee80cd217e0
parent 47216 71c04702a3d5
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8177290: add copy factory methods for unmodifiable List, Set, Map 8184690: add Collectors for collecting into unmodifiable List, Set, and Map Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.SocketPermission;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLPermission;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.common.Log;

/**
 * One request/response exchange (handles 100/101 intermediate response also).
 * depth field used to track number of times a new request is being sent
 * for a given API request. If limit exceeded exception is thrown.
 *
 * Security check is performed here:
 * - uses AccessControlContext captured at API level
 * - checks for appropriate URLPermission for request
 * - if permission allowed, grants equivalent SocketPermission to call
 * - in case of direct HTTP proxy, checks additionally for access to proxy
 *    (CONNECT proxying uses its own Exchange, so check done there)
 *
 */
final class Exchange<T> {

    final HttpRequestImpl request;
    final HttpClientImpl client;
    volatile ExchangeImpl<T> exchImpl;
    // used to record possible cancellation raised before the exchImpl
    // has been established.
    private volatile IOException failed;
    final List<SocketPermission> permissions = new LinkedList<>();
    final AccessControlContext acc;
    final MultiExchange<?,T> multi;
    final Executor parentExecutor;
    final HttpRequest.BodyProcessor requestProcessor;
    boolean upgrading; // to HTTP/2
    final PushGroup<?,T> pushGroup;

    Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
        this.request = request;
        this.upgrading = false;
        this.client = multi.client();
        this.multi = multi;
        this.acc = multi.acc;
        this.parentExecutor = multi.executor;
        this.requestProcessor = request.requestProcessor;
        this.pushGroup = multi.pushGroup;
    }

    /* If different AccessControlContext to be used  */
    Exchange(HttpRequestImpl request,
             MultiExchange<?,T> multi,
             AccessControlContext acc)
    {
        this.request = request;
        this.acc = acc;
        this.upgrading = false;
        this.client = multi.client();
        this.multi = multi;
        this.parentExecutor = multi.executor;
        this.requestProcessor = request.requestProcessor;
        this.pushGroup = multi.pushGroup;
    }

    PushGroup<?,T> getPushGroup() {
        return pushGroup;
    }

    Executor executor() {
        return parentExecutor;
    }

    public HttpRequestImpl request() {
        return request;
    }

    HttpClientImpl client() {
        return client;
    }

    public Response response() throws IOException, InterruptedException {
        return responseImpl(null);
    }

    public T readBody(HttpResponse.BodyHandler<T> responseHandler) throws IOException {
        // The connection will not be returned to the pool in the case of WebSocket
        return exchImpl.readBody(responseHandler, !request.isWebSocket());
    }

    public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
        // The connection will not be returned to the pool in the case of WebSocket
        return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor);
    }

    public void cancel() {
        // cancel can be called concurrently before or at the same time
        // that the exchange impl is being established.
        // In that case we won't be able to propagate the cancellation
        // right away
        if (exchImpl != null) {
            exchImpl.cancel();
        } else {
            // no impl - can't cancel impl yet.
            // call cancel(IOException) instead which takes care
            // of race conditions between impl/cancel.
            cancel(new IOException("Request cancelled"));
        }
    }

    public void cancel(IOException cause) {
        // If the impl is non null, propagate the exception right away.
        // Otherwise record it so that it can be propagated once the
        // exchange impl has been established.
        ExchangeImpl<?> impl = exchImpl;
        if (impl != null) {
            // propagate the exception to the impl
            impl.cancel(cause);
        } else {
            try {
                // no impl yet. record the exception
                failed = cause;
                // now call checkCancelled to recheck the impl.
                // if the failed state is set and the impl is not null, reset
                // the failed state and propagate the exception to the impl.
                checkCancelled(false);
            } catch (IOException x) {
                // should not happen - we passed 'false' above
                throw new UncheckedIOException(x);
            }
        }
    }

    // This method will raise an exception if one was reported and if
    // it is possible to do so. If the exception can be raised, then
    // the failed state will be reset. Otherwise, the failed state
    // will persist until the exception can be raised and the failed state
    // can be cleared.
    // Takes care of possible race conditions.
    private void checkCancelled(boolean throwIfNoImpl) throws IOException {
        ExchangeImpl<?> impl = null;
        IOException cause = null;
        if (failed != null) {
            synchronized(this) {
                cause = failed;
                impl = exchImpl;
                if (throwIfNoImpl || impl != null) {
                    // The exception will be raised by one of the two methods
                    // below: reset the failed state.
                    failed = null;
                }
            }
        }
        if (cause == null) return;
        if (impl != null) {
            // The exception is raised by propagating it to the impl.
            impl.cancel(cause);
        } else if (throwIfNoImpl) {
            // The exception is raised by throwing it immediately
            throw cause;
        } else {
            Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
                         + "\n\tCan''t cancel yet with {2}",
                         request.uri(),
                         request.duration() == null ? -1 :
                         // calling duration.toMillis() can throw an exception.
                         // this is just debugging, we don't care if it overflows.
                         (request.duration().getSeconds() * 1000
                          + request.duration().getNano() / 1000000),
                         cause);
        }
    }

    public void h2Upgrade() {
        upgrading = true;
        request.setH2Upgrade(client.client2());
    }

    static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];

    Response responseImpl(HttpConnection connection)
        throws IOException, InterruptedException
    {
        SecurityException e = securityCheck(acc);
        if (e != null) {
            throw e;
        }

        if (permissions.size() > 0) {
            try {
                return AccessController.doPrivileged(
                        (PrivilegedExceptionAction<Response>)() ->
                             responseImpl0(connection),
                        null,
                        permissions.toArray(SOCKET_ARRAY));
            } catch (Throwable ee) {
                if (ee instanceof PrivilegedActionException) {
                    ee = ee.getCause();
                }
                if (ee instanceof IOException) {
                    throw (IOException) ee;
                } else {
                    throw new RuntimeException(ee); // TODO: fix
                }
            }
        } else {
            return responseImpl0(connection);
        }
    }

    // get/set the exchange impl, solving race condition issues with
    // potential concurrent calls to cancel() or cancel(IOException)
    private void establishExchange(HttpConnection connection)
        throws IOException, InterruptedException
    {
        // check if we have been cancelled first.
        checkCancelled(true);
        // not yet cancelled: create/get a new impl
        exchImpl = ExchangeImpl.get(this, connection);
        // recheck for cancelled, in case of race conditions
        checkCancelled(true);
        // now we're good to go. because exchImpl is no longer null
        // cancel() will be able to propagate directly to the impl
        // after this point.
    }

    private Response responseImpl0(HttpConnection connection)
        throws IOException, InterruptedException
    {
        establishExchange(connection);
        if (request.expectContinue()) {
            Log.logTrace("Sending Expect: 100-Continue");
            request.addSystemHeader("Expect", "100-Continue");
            exchImpl.sendHeadersOnly();

            Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue");
            Response resp = exchImpl.getResponse();
            HttpResponseImpl.logResponse(resp);
            int rcode = resp.statusCode();
            if (rcode != 100) {
                Log.logTrace("Expectation failed: Received {0}",
                             rcode);
                if (upgrading && rcode == 101) {
                    throw new IOException(
                        "Unable to handle 101 while waiting for 100-Continue");
                }
                return resp;
            }

            Log.logTrace("Received 100-Continue: sending body");
            exchImpl.sendBody();

            Log.logTrace("Body sent: waiting for response");
            resp = exchImpl.getResponse();
            HttpResponseImpl.logResponse(resp);

            return checkForUpgrade(resp, exchImpl);
        } else {
            exchImpl.sendHeadersOnly();
            exchImpl.sendBody();
            Response resp = exchImpl.getResponse();
            HttpResponseImpl.logResponse(resp);
            return checkForUpgrade(resp, exchImpl);
        }
    }

    // Completed HttpResponse will be null if response succeeded
    // will be a non null responseAsync if expect continue returns an error

    public CompletableFuture<Response> responseAsync() {
        return responseAsyncImpl(null);
    }

    CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
        SecurityException e = securityCheck(acc);
        if (e != null) {
            return MinimalFuture.failedFuture(e);
        }
        if (permissions.size() > 0) {
            return AccessController.doPrivileged(
                    (PrivilegedAction<CompletableFuture<Response>>)() ->
                        responseAsyncImpl0(connection),
                    null,
                    permissions.toArray(SOCKET_ARRAY));
        } else {
            return responseAsyncImpl0(connection);
        }
    }

    CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
        try {
            establishExchange(connection);
        } catch (IOException | InterruptedException e) {
            return MinimalFuture.failedFuture(e);
        }
        if (request.expectContinue()) {
            request.addSystemHeader("Expect", "100-Continue");
            Log.logTrace("Sending Expect: 100-Continue");
            return exchImpl
                    .sendHeadersAsync()
                    .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
                    .thenCompose((Response r1) -> {
                        HttpResponseImpl.logResponse(r1);
                        int rcode = r1.statusCode();
                        if (rcode == 100) {
                            Log.logTrace("Received 100-Continue: sending body");
                            CompletableFuture<Response> cf =
                                    exchImpl.sendBodyAsync()
                                            .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
                            cf = wrapForUpgrade(cf);
                            cf = wrapForLog(cf);
                            return cf;
                        } else {
                            Log.logTrace("Expectation failed: Received {0}",
                                         rcode);
                            if (upgrading && rcode == 101) {
                                IOException failed = new IOException(
                                        "Unable to handle 101 while waiting for 100");
                                return MinimalFuture.failedFuture(failed);
                            }
                            return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
                                  .thenApply(v ->  r1);
                        }
                    });
        } else {
            CompletableFuture<Response> cf = exchImpl
                    .sendHeadersAsync()
                    .thenCompose(ExchangeImpl::sendBodyAsync)
                    .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
            cf = wrapForUpgrade(cf);
            cf = wrapForLog(cf);
            return cf;
        }
    }

    private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
        if (upgrading) {
            return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
        }
        return cf;
    }

    private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
        if (Log.requests()) {
            return cf.thenApply(response -> {
                HttpResponseImpl.logResponse(response);
                return response;
            });
        }
        return cf;
    }

    HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
        return HttpResponse.BodyProcessor.discard((T)null);
    }

    // if this response was received in reply to an upgrade
    // then create the Http2Connection from the HttpConnection
    // initialize it and wait for the real response on a newly created Stream

    private CompletableFuture<Response>
    checkForUpgradeAsync(Response resp,
                         ExchangeImpl<T> ex) {

        int rcode = resp.statusCode();
        if (upgrading && (rcode == 101)) {
            Http1Exchange<T> e = (Http1Exchange<T>)ex;
            // check for 101 switching protocols
            // 101 responses are not supposed to contain a body.
            //    => should we fail if there is one?
            return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
                .thenCompose((T v) -> // v is null
                     Http2Connection.createAsync(e.connection(),
                                                 client.client2(),
                                                 this, e.getBuffer())
                        .thenCompose((Http2Connection c) -> {
                            c.putConnection();
                            Stream<T> s = c.getStream(1);
                            exchImpl = s;
                            return s.getResponseAsync(null);
                        })
                );
        }
        return MinimalFuture.completedFuture(resp);
    }

    private Response checkForUpgrade(Response resp,
                                             ExchangeImpl<T> ex)
        throws IOException, InterruptedException
    {
        int rcode = resp.statusCode();
        if (upgrading && (rcode == 101)) {
            Http1Exchange<T> e = (Http1Exchange<T>) ex;

            // 101 responses are not supposed to contain a body.
            //    => should we fail if there is one?
            //    => readBody called here by analogy with
            //       checkForUpgradeAsync above
            e.readBody(this::ignoreBody, false);

            // must get connection from Http1Exchange
            Http2Connection h2con = new Http2Connection(e.connection(),
                                                        client.client2(),
                                                        this, e.getBuffer());
            h2con.putConnection();
            Stream<T> s = h2con.getStream(1);
            exchImpl = s;
            Response xx = s.getResponse();
            HttpResponseImpl.logResponse(xx);
            return xx;
        }
        return resp;
    }

    private URI getURIForSecurityCheck() {
        URI u;
        String method = request.method();
        InetSocketAddress authority = request.authority();
        URI uri = request.uri();

        // CONNECT should be restricted at API level
        if (method.equalsIgnoreCase("CONNECT")) {
            try {
                u = new URI("socket",
                             null,
                             authority.getHostString(),
                             authority.getPort(),
                             null,
                             null,
                             null);
            } catch (URISyntaxException e) {
                throw new InternalError(e); // shouldn't happen
            }
        } else {
            u = uri;
        }
        return u;
    }

    /**
     * Do the security check and return any exception.
     * Return null if no check needed or passes.
     *
     * Also adds any generated permissions to the "permissions" list.
     */
    private SecurityException securityCheck(AccessControlContext acc) {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null) {
            return null;
        }

        String method = request.method();
        HttpHeaders userHeaders = request.getUserHeaders();
        URI u = getURIForSecurityCheck();
        URLPermission p = Utils.getPermission(u, method, userHeaders.map());

        try {
            assert acc != null;
            sm.checkPermission(p, acc);
            permissions.add(getSocketPermissionFor(u));
        } catch (SecurityException e) {
            return e;
        }
        ProxySelector ps = client.proxy().orElse(null);
        if (ps != null) {
            InetSocketAddress proxy = (InetSocketAddress)
                    ps.select(u).get(0).address(); // TODO: check this
            // may need additional check
            if (!method.equals("CONNECT")) {
                // a direct http proxy. Need to check access to proxy
                try {
                    u = new URI("socket", null, proxy.getHostString(),
                        proxy.getPort(), null, null, null);
                } catch (URISyntaxException e) {
                    throw new InternalError(e); // shouldn't happen
                }
                p = new URLPermission(u.toString(), "CONNECT");
                try {
                    sm.checkPermission(p, acc);
                } catch (SecurityException e) {
                    permissions.clear();
                    return e;
                }
                String sockperm = proxy.getHostString() +
                        ":" + Integer.toString(proxy.getPort());

                permissions.add(new SocketPermission(sockperm, "connect,resolve"));
            }
        }
        return null;
    }

    HttpClient.Redirect followRedirects() {
        return client.followRedirects();
    }

    HttpClient.Version version() {
        return multi.version();
    }

    private static SocketPermission getSocketPermissionFor(URI url) {
        if (System.getSecurityManager() == null) {
            return null;
        }

        StringBuilder sb = new StringBuilder();
        String host = url.getHost();
        sb.append(host);
        int port = url.getPort();
        if (port == -1) {
            String scheme = url.getScheme();
            if ("http".equals(scheme)) {
                sb.append(":80");
            } else { // scheme must be https
                sb.append(":443");
            }
        } else {
            sb.append(':')
              .append(Integer.toString(port));
        }
        String target = sb.toString();
        return new SocketPermission(target, "connect");
    }

    AccessControlContext getAccessControlContext() {
        return acc;
    }
}