src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
author smarks
Mon, 04 Dec 2017 11:50:04 -0800
changeset 48059 6ee80cd217e0
parent 47216 71c04702a3d5
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8177290: add copy factory methods for unmodifiable List, Set, Map 8184690: add Collectors for collecting into unmodifiable List, Set, and Map Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.websocket.BuilderImpl;

/**
 * Client implementation. Contains all configuration information and also
 * the selector manager thread which allows async events to be registered
 * and delivered when they occur. See AsyncEvent.
 */
class HttpClientImpl extends HttpClient {

    // Define the default factory as a static inner class
    // that embeds all the necessary logic to avoid
    // the risk of using a lambda that might keep a reference on the
    // HttpClient instance from which it was created (helps with
    // heapdump analysis).
    private static final class DefaultThreadFactory implements ThreadFactory {
        private DefaultThreadFactory() {}
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
            t.setDaemon(true);
            return t;
        }
        static final ThreadFactory INSTANCE = new DefaultThreadFactory();
    }

    private final CookieManager cookieManager;
    private final Redirect followRedirects;
    private final ProxySelector proxySelector;
    private final Authenticator authenticator;
    private final Version version;
    private final ConnectionPool connections;
    private final Executor executor;
    // Security parameters
    private final SSLContext sslContext;
    private final SSLParameters sslParams;
    private final SelectorManager selmgr;
    private final FilterFactory filters;
    private final Http2ClientImpl client2;

    /** A Set of, deadline first, ordered timeout events. */
    private final TreeSet<TimeoutEvent> timeouts;

    public static HttpClientImpl create(HttpClientBuilderImpl builder) {
        HttpClientImpl impl = new HttpClientImpl(builder);
        impl.start();
        return impl;
    }

    private HttpClientImpl(HttpClientBuilderImpl builder) {
        if (builder.sslContext == null) {
            try {
                sslContext = SSLContext.getDefault();
            } catch (NoSuchAlgorithmException ex) {
                throw new InternalError(ex);
            }
        } else {
            sslContext = builder.sslContext;
        }
        Executor ex = builder.executor;
        if (ex == null) {
            ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
        } else {
            ex = builder.executor;
        }
        client2 = new Http2ClientImpl(this);
        executor = ex;
        cookieManager = builder.cookieManager;
        followRedirects = builder.followRedirects == null ?
                Redirect.NEVER : builder.followRedirects;
        this.proxySelector = builder.proxy;
        authenticator = builder.authenticator;
        if (builder.version == null) {
            version = HttpClient.Version.HTTP_2;
        } else {
            version = builder.version;
        }
        if (builder.sslParams == null) {
            sslParams = getDefaultParams(sslContext);
        } else {
            sslParams = builder.sslParams;
        }
        connections = new ConnectionPool();
        connections.start();
        timeouts = new TreeSet<>();
        try {
            selmgr = new SelectorManager(this);
        } catch (IOException e) {
            // unlikely
            throw new InternalError(e);
        }
        selmgr.setDaemon(true);
        filters = new FilterFactory();
        initFilters();
    }

    private void start() {
        selmgr.start();
    }

    private static SSLParameters getDefaultParams(SSLContext ctx) {
        SSLParameters params = ctx.getSupportedSSLParameters();
        params.setProtocols(new String[]{"TLSv1.2"});
        return params;
    }

    /**
     * Wait for activity on given exchange (assuming blocking = false).
     * It's a no-op if blocking = true. In particular, the following occurs
     * in the SelectorManager thread.
     *
     *  1) mark the connection non-blocking
     *  2) add to selector
     *  3) If selector fires for this exchange then
     *  4)   - mark connection as blocking
     *  5)   - call AsyncEvent.handle()
     *
     * If exchange needs to block again, then call registerEvent() again
     */
    void registerEvent(AsyncEvent exchange) throws IOException {
        selmgr.register(exchange);
    }

    /**
     * Only used from RawChannel to disconnect the channel from
     * the selector
     */
    void cancelRegistration(SocketChannel s) {
        selmgr.cancel(s);
    }


    Http2ClientImpl client2() {
        return client2;
    }

    /*
    @Override
    public ByteBuffer getBuffer() {
        return pool.getBuffer();
    }

    // SSL buffers are larger. Manage separately

    int size = 16 * 1024;

    ByteBuffer getSSLBuffer() {
        return ByteBuffer.allocate(size);
    }

    /**
     * Return a new buffer that's a bit bigger than the given one
     *
     * @param buf
     * @return
     *
    ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
        size = buf.capacity() * 12 / 10; // 20% bigger
        return ByteBuffer.allocate(size);
    }

    synchronized void returnSSLBuffer(ByteBuffer buf) {
        if (buf.capacity() >= size)
           sslBuffers.add(0, buf);
    }

    @Override
    public void returnBuffer(ByteBuffer buffer) {
        pool.returnBuffer(buffer);
    }
    */

    @Override
    public <T> HttpResponse<T>
    send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
        throws IOException, InterruptedException
    {
        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
        return mex.response();
    }

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
    {
        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
        return mex.responseAsync()
                  .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
    }

    @Override
    public <U, T> CompletableFuture<U>
    sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
        MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
        return mex.multiResponseAsync();
    }

    // new impl. Should get rid of above
    /*
    static class BufferPool implements BufferHandler {

        final LinkedList<ByteBuffer> freelist = new LinkedList<>();

        @Override
        public synchronized ByteBuffer getBuffer() {
            ByteBuffer buf;

            while (!freelist.isEmpty()) {
                buf = freelist.removeFirst();
                buf.clear();
                return buf;
            }
            return ByteBuffer.allocate(BUFSIZE);
        }

        @Override
        public synchronized void returnBuffer(ByteBuffer buffer) {
            assert buffer.capacity() > 0;
            freelist.add(buffer);
        }
    }

    static BufferPool pool = new BufferPool();

    static BufferHandler pool() {
        return pool;
    }
*/
    // Main loop for this client's selector
    private final static class SelectorManager extends Thread {

        private static final long NODEADLINE = 3000L;
        private final Selector selector;
        private volatile boolean closed;
        private final List<AsyncEvent> readyList;
        private final List<AsyncEvent> registrations;

        // Uses a weak reference to the HttpClient owning this
        // selector: a strong reference prevents its garbage
        // collection while the thread is running.
        // We want the thread to exit gracefully when the
        // HttpClient that owns it gets GC'ed.
        WeakReference<HttpClientImpl> ownerRef;

        SelectorManager(HttpClientImpl ref) throws IOException {
            super(null, null, "SelectorManager", 0, false);
            ownerRef = new WeakReference<>(ref);
            readyList = new ArrayList<>();
            registrations = new ArrayList<>();
            selector = Selector.open();
        }

        // This returns immediately. So caller not allowed to send/receive
        // on connection.

        synchronized void register(AsyncEvent e) throws IOException {
            registrations.add(e);
            selector.wakeup();
        }

        synchronized void cancel(SocketChannel e) {
            SelectionKey key = e.keyFor(selector);
            if (key != null) {
                key.cancel();
            }
            selector.wakeup();
        }

        void wakeupSelector() {
            selector.wakeup();
        }

        synchronized void shutdown() {
            closed = true;
            try {
                selector.close();
            } catch (IOException ignored) { }
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    HttpClientImpl client;
                    synchronized (this) {
                        for (AsyncEvent exchange : registrations) {
                            SelectableChannel c = exchange.channel();
                            try {
                                c.configureBlocking(false);
                                SelectionKey key = c.keyFor(selector);
                                SelectorAttachment sa;
                                if (key == null || !key.isValid()) {
                                    if (key != null) {
                                        // key is canceled.
                                        // invoke selectNow() to purge it
                                        // before registering the new event.
                                        selector.selectNow();
                                    }
                                    sa = new SelectorAttachment(c, selector);
                                } else {
                                    sa = (SelectorAttachment) key.attachment();
                                }
                                sa.register(exchange);
                            } catch (IOException e) {
                                Log.logError("HttpClientImpl: " + e);
                                c.close();
                                // let the exchange deal with it
                                handleEvent(exchange);
                            }
                        }
                        registrations.clear();
                    }

                    // Check whether client is still alive, and if not,
                    // gracefully stop this thread
                    if ((client = ownerRef.get()) == null) {
                        Log.logTrace("HttpClient no longer referenced. Exiting...");
                        return;
                    }
                    long millis = client.purgeTimeoutsAndReturnNextDeadline();
                    client = null; // don't hold onto the client ref

                    //debugPrint(selector);
                    // Don't wait for ever as it might prevent the thread to
                    // stop gracefully. millis will be 0 if no deadline was found.
                    int n = selector.select(millis == 0 ? NODEADLINE : millis);
                    if (n == 0) {
                        // Check whether client is still alive, and if not,
                        // gracefully stop this thread
                        if ((client = ownerRef.get()) == null) {
                            Log.logTrace("HttpClient no longer referenced. Exiting...");
                            return;
                        }
                        client.purgeTimeoutsAndReturnNextDeadline();
                        client = null; // don't hold onto the client ref
                        continue;
                    }
                    Set<SelectionKey> keys = selector.selectedKeys();

                    for (SelectionKey key : keys) {
                        SelectorAttachment sa = (SelectorAttachment) key.attachment();
                        int eventsOccurred = key.readyOps();
                        sa.events(eventsOccurred).forEach(readyList::add);
                        sa.resetInterestOps(eventsOccurred);
                    }
                    selector.selectNow(); // complete cancellation
                    selector.selectedKeys().clear();

                    for (AsyncEvent exchange : readyList) {
                        if (exchange.blocking()) {
                            exchange.channel().configureBlocking(true);
                        }
                        handleEvent(exchange); // will be delegated to executor
                    }
                    readyList.clear();
                }
            } catch (Throwable e) {
                if (!closed) {
                    // This terminates thread. So, better just print stack trace
                    String err = Utils.stackTrace(e);
                    Log.logError("HttpClientImpl: fatal error: " + err);
                }
            } finally {
                shutdown();
            }
        }

        void debugPrint(Selector selector) {
            System.err.println("Selector: debugprint start");
            Set<SelectionKey> keys = selector.keys();
            for (SelectionKey key : keys) {
                SelectableChannel c = key.channel();
                int ops = key.interestOps();
                System.err.printf("selector chan:%s ops:%d\n", c, ops);
            }
            System.err.println("Selector: debugprint end");
        }

        void handleEvent(AsyncEvent e) {
            if (closed) {
                e.abort();
            } else {
                e.handle();
            }
        }
    }

    /**
     * Tracks multiple user level registrations associated with one NIO
     * registration (SelectionKey). In this implementation, registrations
     * are one-off and when an event is posted the registration is cancelled
     * until explicitly registered again.
     *
     * <p> No external synchronization required as this class is only used
     * by the SelectorManager thread. One of these objects required per
     * connection.
     */
    private static class SelectorAttachment {
        private final SelectableChannel chan;
        private final Selector selector;
        private final ArrayList<AsyncEvent> pending;
        private int interestOps;

        SelectorAttachment(SelectableChannel chan, Selector selector) {
            this.pending = new ArrayList<>();
            this.chan = chan;
            this.selector = selector;
        }

        void register(AsyncEvent e) throws ClosedChannelException {
            int newOps = e.interestOps();
            boolean reRegister = (interestOps & newOps) != newOps;
            interestOps |= newOps;
            pending.add(e);
            if (reRegister) {
                // first time registration happens here also
                chan.register(selector, interestOps, this);
            }
        }

        /**
         * Returns a Stream<AsyncEvents> containing only events that are
         * registered with the given {@code interestOps}.
         */
        Stream<AsyncEvent> events(int interestOps) {
            return pending.stream()
                    .filter(ev -> (ev.interestOps() & interestOps) != 0);
        }

        /**
         * Removes any events with the given {@code interestOps}, and if no
         * events remaining, cancels the associated SelectionKey.
         */
        void resetInterestOps(int interestOps) {
            int newOps = 0;

            Iterator<AsyncEvent> itr = pending.iterator();
            while (itr.hasNext()) {
                AsyncEvent event = itr.next();
                int evops = event.interestOps();
                if (event.repeating()) {
                    newOps |= evops;
                    continue;
                }
                if ((evops & interestOps) != 0) {
                    itr.remove();
                } else {
                    newOps |= evops;
                }
            }

            this.interestOps = newOps;
            SelectionKey key = chan.keyFor(selector);
            if (newOps == 0) {
                key.cancel();
            } else {
                key.interestOps(newOps);
            }
        }
    }

    @Override
    public SSLContext sslContext() {
        Utils.checkNetPermission("getSSLContext");
        return sslContext;
    }

    @Override
    public Optional<SSLParameters> sslParameters() {
        return Optional.ofNullable(sslParams);
    }

    @Override
    public Optional<Authenticator> authenticator() {
        return Optional.ofNullable(authenticator);
    }

    @Override
    public Executor executor() {
        return executor;
    }

    ConnectionPool connectionPool() {
        return connections;
    }

    @Override
    public Redirect followRedirects() {
        return followRedirects;
    }


    @Override
    public Optional<CookieManager> cookieManager() {
        return Optional.ofNullable(cookieManager);
    }

    @Override
    public Optional<ProxySelector> proxy() {
        return Optional.ofNullable(this.proxySelector);
    }

    @Override
    public WebSocket.Builder newWebSocketBuilder(URI uri,
                                                 WebSocket.Listener listener) {
        return new BuilderImpl(this, uri, listener);
    }

    @Override
    public Version version() {
        return version;
    }

    //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();

    boolean getHttp2Allowed() {
        return version.equals(Version.HTTP_2);
    }

    private void initFilters() {
        addFilter(AuthenticationFilter.class);
        addFilter(RedirectFilter.class);
        if (this.cookieManager != null) {
            addFilter(CookieFilter.class);
        }
    }

    private void addFilter(Class<? extends HeaderFilter> f) {
        filters.addFilter(f);
    }

    final List<HeaderFilter> filterChain() {
        return filters.getFilterChain();
    }

    // Timer controls.
    // Timers are implemented through timed Selector.select() calls.

    synchronized void registerTimer(TimeoutEvent event) {
        Log.logTrace("Registering timer {0}", event);
        timeouts.add(event);
        selmgr.wakeupSelector();
    }

    synchronized void cancelTimer(TimeoutEvent event) {
        Log.logTrace("Canceling timer {0}", event);
        timeouts.remove(event);
    }

    /**
     * Purges ( handles ) timer events that have passed their deadline, and
     * returns the amount of time, in milliseconds, until the next earliest
     * event. A return value of 0 means that there are no events.
     */
    private long purgeTimeoutsAndReturnNextDeadline() {
        long diff = 0L;
        List<TimeoutEvent> toHandle = null;
        int remaining = 0;
        // enter critical section to retrieve the timeout event to handle
        synchronized(this) {
            if (timeouts.isEmpty()) return 0L;

            Instant now = Instant.now();
            Iterator<TimeoutEvent> itr = timeouts.iterator();
            while (itr.hasNext()) {
                TimeoutEvent event = itr.next();
                diff = now.until(event.deadline(), ChronoUnit.MILLIS);
                if (diff <= 0) {
                    itr.remove();
                    toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
                    toHandle.add(event);
                } else {
                    break;
                }
            }
            remaining = timeouts.size();
        }

        // can be useful for debugging
        if (toHandle != null && Log.trace()) {
            Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
                    + (toHandle == null ? 0 : toHandle.size()) + " events, "
                    + "remaining " + remaining
                    + ", next deadline: " + (diff < 0 ? 0L : diff));
        }

        // handle timeout events out of critical section
        if (toHandle != null) {
            Throwable failed = null;
            for (TimeoutEvent event : toHandle) {
                try {
                   Log.logTrace("Firing timer {0}", event);
                   event.handle();
                } catch (Error | RuntimeException e) {
                    // Not expected. Handle remaining events then throw...
                    // If e is an OOME or SOE it might simply trigger a new
                    // error from here - but in this case there's not much we
                    // could do anyway. Just let it flow...
                    if (failed == null) failed = e;
                    else failed.addSuppressed(e);
                    Log.logTrace("Failed to handle event {0}: {1}", event, e);
                }
            }
            if (failed instanceof Error) throw (Error) failed;
            if (failed instanceof RuntimeException) throw (RuntimeException) failed;
        }

        // return time to wait until next event. 0L if there's no more events.
        return diff < 0 ? 0L : diff;
    }

    // used for the connection window
    int getReceiveBufferSize() {
        return Utils.getIntegerNetProperty(
                "jdk.httpclient.connectionWindowSize", 256 * 1024
        );
    }
}