src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
changeset 47216 71c04702a3d5
parent 45531 fb3dbffad37b
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Tue Sep 12 19:03:39 2017 +0200
@@ -0,0 +1,673 @@
+/*
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.Authenticator;
+import java.net.CookieManager;
+import java.net.ProxySelector;
+import java.net.URI;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.Stream;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.Utils;
+import jdk.incubator.http.internal.websocket.BuilderImpl;
+
+/**
+ * Client implementation. Contains all configuration information and also
+ * the selector manager thread which allows async events to be registered
+ * and delivered when they occur. See AsyncEvent.
+ */
+class HttpClientImpl extends HttpClient {
+
+    // Define the default factory as a static inner class
+    // that embeds all the necessary logic to avoid
+    // the risk of using a lambda that might keep a reference on the
+    // HttpClient instance from which it was created (helps with
+    // heapdump analysis).
+    private static final class DefaultThreadFactory implements ThreadFactory {
+        private DefaultThreadFactory() {}
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
+            t.setDaemon(true);
+            return t;
+        }
+        static final ThreadFactory INSTANCE = new DefaultThreadFactory();
+    }
+
+    private final CookieManager cookieManager;
+    private final Redirect followRedirects;
+    private final ProxySelector proxySelector;
+    private final Authenticator authenticator;
+    private final Version version;
+    private final ConnectionPool connections;
+    private final Executor executor;
+    // Security parameters
+    private final SSLContext sslContext;
+    private final SSLParameters sslParams;
+    private final SelectorManager selmgr;
+    private final FilterFactory filters;
+    private final Http2ClientImpl client2;
+
+    /** A Set of, deadline first, ordered timeout events. */
+    private final TreeSet<TimeoutEvent> timeouts;
+
+    public static HttpClientImpl create(HttpClientBuilderImpl builder) {
+        HttpClientImpl impl = new HttpClientImpl(builder);
+        impl.start();
+        return impl;
+    }
+
+    private HttpClientImpl(HttpClientBuilderImpl builder) {
+        if (builder.sslContext == null) {
+            try {
+                sslContext = SSLContext.getDefault();
+            } catch (NoSuchAlgorithmException ex) {
+                throw new InternalError(ex);
+            }
+        } else {
+            sslContext = builder.sslContext;
+        }
+        Executor ex = builder.executor;
+        if (ex == null) {
+            ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
+        } else {
+            ex = builder.executor;
+        }
+        client2 = new Http2ClientImpl(this);
+        executor = ex;
+        cookieManager = builder.cookieManager;
+        followRedirects = builder.followRedirects == null ?
+                Redirect.NEVER : builder.followRedirects;
+        this.proxySelector = builder.proxy;
+        authenticator = builder.authenticator;
+        if (builder.version == null) {
+            version = HttpClient.Version.HTTP_2;
+        } else {
+            version = builder.version;
+        }
+        if (builder.sslParams == null) {
+            sslParams = getDefaultParams(sslContext);
+        } else {
+            sslParams = builder.sslParams;
+        }
+        connections = new ConnectionPool();
+        connections.start();
+        timeouts = new TreeSet<>();
+        try {
+            selmgr = new SelectorManager(this);
+        } catch (IOException e) {
+            // unlikely
+            throw new InternalError(e);
+        }
+        selmgr.setDaemon(true);
+        filters = new FilterFactory();
+        initFilters();
+    }
+
+    private void start() {
+        selmgr.start();
+    }
+
+    private static SSLParameters getDefaultParams(SSLContext ctx) {
+        SSLParameters params = ctx.getSupportedSSLParameters();
+        params.setProtocols(new String[]{"TLSv1.2"});
+        return params;
+    }
+
+    /**
+     * Wait for activity on given exchange (assuming blocking = false).
+     * It's a no-op if blocking = true. In particular, the following occurs
+     * in the SelectorManager thread.
+     *
+     *  1) mark the connection non-blocking
+     *  2) add to selector
+     *  3) If selector fires for this exchange then
+     *  4)   - mark connection as blocking
+     *  5)   - call AsyncEvent.handle()
+     *
+     * If exchange needs to block again, then call registerEvent() again
+     */
+    void registerEvent(AsyncEvent exchange) throws IOException {
+        selmgr.register(exchange);
+    }
+
+    /**
+     * Only used from RawChannel to disconnect the channel from
+     * the selector
+     */
+    void cancelRegistration(SocketChannel s) {
+        selmgr.cancel(s);
+    }
+
+
+    Http2ClientImpl client2() {
+        return client2;
+    }
+
+    /*
+    @Override
+    public ByteBuffer getBuffer() {
+        return pool.getBuffer();
+    }
+
+    // SSL buffers are larger. Manage separately
+
+    int size = 16 * 1024;
+
+    ByteBuffer getSSLBuffer() {
+        return ByteBuffer.allocate(size);
+    }
+
+    /**
+     * Return a new buffer that's a bit bigger than the given one
+     *
+     * @param buf
+     * @return
+     *
+    ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
+        size = buf.capacity() * 12 / 10; // 20% bigger
+        return ByteBuffer.allocate(size);
+    }
+
+    synchronized void returnSSLBuffer(ByteBuffer buf) {
+        if (buf.capacity() >= size)
+           sslBuffers.add(0, buf);
+    }
+
+    @Override
+    public void returnBuffer(ByteBuffer buffer) {
+        pool.returnBuffer(buffer);
+    }
+    */
+
+    @Override
+    public <T> HttpResponse<T>
+    send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
+        throws IOException, InterruptedException
+    {
+        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
+        return mex.response();
+    }
+
+    @Override
+    public <T> CompletableFuture<HttpResponse<T>>
+    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
+    {
+        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
+        return mex.responseAsync()
+                  .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
+    }
+
+    @Override
+    public <U, T> CompletableFuture<U>
+    sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
+        MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
+        return mex.multiResponseAsync();
+    }
+
+    // new impl. Should get rid of above
+    /*
+    static class BufferPool implements BufferHandler {
+
+        final LinkedList<ByteBuffer> freelist = new LinkedList<>();
+
+        @Override
+        public synchronized ByteBuffer getBuffer() {
+            ByteBuffer buf;
+
+            while (!freelist.isEmpty()) {
+                buf = freelist.removeFirst();
+                buf.clear();
+                return buf;
+            }
+            return ByteBuffer.allocate(BUFSIZE);
+        }
+
+        @Override
+        public synchronized void returnBuffer(ByteBuffer buffer) {
+            assert buffer.capacity() > 0;
+            freelist.add(buffer);
+        }
+    }
+
+    static BufferPool pool = new BufferPool();
+
+    static BufferHandler pool() {
+        return pool;
+    }
+*/
+    // Main loop for this client's selector
+    private final static class SelectorManager extends Thread {
+
+        private static final long NODEADLINE = 3000L;
+        private final Selector selector;
+        private volatile boolean closed;
+        private final List<AsyncEvent> readyList;
+        private final List<AsyncEvent> registrations;
+
+        // Uses a weak reference to the HttpClient owning this
+        // selector: a strong reference prevents its garbage
+        // collection while the thread is running.
+        // We want the thread to exit gracefully when the
+        // HttpClient that owns it gets GC'ed.
+        WeakReference<HttpClientImpl> ownerRef;
+
+        SelectorManager(HttpClientImpl ref) throws IOException {
+            super(null, null, "SelectorManager", 0, false);
+            ownerRef = new WeakReference<>(ref);
+            readyList = new ArrayList<>();
+            registrations = new ArrayList<>();
+            selector = Selector.open();
+        }
+
+        // This returns immediately. So caller not allowed to send/receive
+        // on connection.
+
+        synchronized void register(AsyncEvent e) throws IOException {
+            registrations.add(e);
+            selector.wakeup();
+        }
+
+        synchronized void cancel(SocketChannel e) {
+            SelectionKey key = e.keyFor(selector);
+            if (key != null) {
+                key.cancel();
+            }
+            selector.wakeup();
+        }
+
+        void wakeupSelector() {
+            selector.wakeup();
+        }
+
+        synchronized void shutdown() {
+            closed = true;
+            try {
+                selector.close();
+            } catch (IOException ignored) { }
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    HttpClientImpl client;
+                    synchronized (this) {
+                        for (AsyncEvent exchange : registrations) {
+                            SelectableChannel c = exchange.channel();
+                            try {
+                                c.configureBlocking(false);
+                                SelectionKey key = c.keyFor(selector);
+                                SelectorAttachment sa;
+                                if (key == null || !key.isValid()) {
+                                    if (key != null) {
+                                        // key is canceled.
+                                        // invoke selectNow() to purge it
+                                        // before registering the new event.
+                                        selector.selectNow();
+                                    }
+                                    sa = new SelectorAttachment(c, selector);
+                                } else {
+                                    sa = (SelectorAttachment) key.attachment();
+                                }
+                                sa.register(exchange);
+                            } catch (IOException e) {
+                                Log.logError("HttpClientImpl: " + e);
+                                c.close();
+                                // let the exchange deal with it
+                                handleEvent(exchange);
+                            }
+                        }
+                        registrations.clear();
+                    }
+
+                    // Check whether client is still alive, and if not,
+                    // gracefully stop this thread
+                    if ((client = ownerRef.get()) == null) {
+                        Log.logTrace("HttpClient no longer referenced. Exiting...");
+                        return;
+                    }
+                    long millis = client.purgeTimeoutsAndReturnNextDeadline();
+                    client = null; // don't hold onto the client ref
+
+                    //debugPrint(selector);
+                    // Don't wait for ever as it might prevent the thread to
+                    // stop gracefully. millis will be 0 if no deadline was found.
+                    int n = selector.select(millis == 0 ? NODEADLINE : millis);
+                    if (n == 0) {
+                        // Check whether client is still alive, and if not,
+                        // gracefully stop this thread
+                        if ((client = ownerRef.get()) == null) {
+                            Log.logTrace("HttpClient no longer referenced. Exiting...");
+                            return;
+                        }
+                        client.purgeTimeoutsAndReturnNextDeadline();
+                        client = null; // don't hold onto the client ref
+                        continue;
+                    }
+                    Set<SelectionKey> keys = selector.selectedKeys();
+
+                    for (SelectionKey key : keys) {
+                        SelectorAttachment sa = (SelectorAttachment) key.attachment();
+                        int eventsOccurred = key.readyOps();
+                        sa.events(eventsOccurred).forEach(readyList::add);
+                        sa.resetInterestOps(eventsOccurred);
+                    }
+                    selector.selectNow(); // complete cancellation
+                    selector.selectedKeys().clear();
+
+                    for (AsyncEvent exchange : readyList) {
+                        if (exchange.blocking()) {
+                            exchange.channel().configureBlocking(true);
+                        }
+                        handleEvent(exchange); // will be delegated to executor
+                    }
+                    readyList.clear();
+                }
+            } catch (Throwable e) {
+                if (!closed) {
+                    // This terminates thread. So, better just print stack trace
+                    String err = Utils.stackTrace(e);
+                    Log.logError("HttpClientImpl: fatal error: " + err);
+                }
+            } finally {
+                shutdown();
+            }
+        }
+
+        void debugPrint(Selector selector) {
+            System.err.println("Selector: debugprint start");
+            Set<SelectionKey> keys = selector.keys();
+            for (SelectionKey key : keys) {
+                SelectableChannel c = key.channel();
+                int ops = key.interestOps();
+                System.err.printf("selector chan:%s ops:%d\n", c, ops);
+            }
+            System.err.println("Selector: debugprint end");
+        }
+
+        void handleEvent(AsyncEvent e) {
+            if (closed) {
+                e.abort();
+            } else {
+                e.handle();
+            }
+        }
+    }
+
+    /**
+     * Tracks multiple user level registrations associated with one NIO
+     * registration (SelectionKey). In this implementation, registrations
+     * are one-off and when an event is posted the registration is cancelled
+     * until explicitly registered again.
+     *
+     * <p> No external synchronization required as this class is only used
+     * by the SelectorManager thread. One of these objects required per
+     * connection.
+     */
+    private static class SelectorAttachment {
+        private final SelectableChannel chan;
+        private final Selector selector;
+        private final ArrayList<AsyncEvent> pending;
+        private int interestOps;
+
+        SelectorAttachment(SelectableChannel chan, Selector selector) {
+            this.pending = new ArrayList<>();
+            this.chan = chan;
+            this.selector = selector;
+        }
+
+        void register(AsyncEvent e) throws ClosedChannelException {
+            int newOps = e.interestOps();
+            boolean reRegister = (interestOps & newOps) != newOps;
+            interestOps |= newOps;
+            pending.add(e);
+            if (reRegister) {
+                // first time registration happens here also
+                chan.register(selector, interestOps, this);
+            }
+        }
+
+        /**
+         * Returns a Stream<AsyncEvents> containing only events that are
+         * registered with the given {@code interestOps}.
+         */
+        Stream<AsyncEvent> events(int interestOps) {
+            return pending.stream()
+                    .filter(ev -> (ev.interestOps() & interestOps) != 0);
+        }
+
+        /**
+         * Removes any events with the given {@code interestOps}, and if no
+         * events remaining, cancels the associated SelectionKey.
+         */
+        void resetInterestOps(int interestOps) {
+            int newOps = 0;
+
+            Iterator<AsyncEvent> itr = pending.iterator();
+            while (itr.hasNext()) {
+                AsyncEvent event = itr.next();
+                int evops = event.interestOps();
+                if (event.repeating()) {
+                    newOps |= evops;
+                    continue;
+                }
+                if ((evops & interestOps) != 0) {
+                    itr.remove();
+                } else {
+                    newOps |= evops;
+                }
+            }
+
+            this.interestOps = newOps;
+            SelectionKey key = chan.keyFor(selector);
+            if (newOps == 0) {
+                key.cancel();
+            } else {
+                key.interestOps(newOps);
+            }
+        }
+    }
+
+    @Override
+    public SSLContext sslContext() {
+        Utils.checkNetPermission("getSSLContext");
+        return sslContext;
+    }
+
+    @Override
+    public Optional<SSLParameters> sslParameters() {
+        return Optional.ofNullable(sslParams);
+    }
+
+    @Override
+    public Optional<Authenticator> authenticator() {
+        return Optional.ofNullable(authenticator);
+    }
+
+    @Override
+    public Executor executor() {
+        return executor;
+    }
+
+    ConnectionPool connectionPool() {
+        return connections;
+    }
+
+    @Override
+    public Redirect followRedirects() {
+        return followRedirects;
+    }
+
+
+    @Override
+    public Optional<CookieManager> cookieManager() {
+        return Optional.ofNullable(cookieManager);
+    }
+
+    @Override
+    public Optional<ProxySelector> proxy() {
+        return Optional.ofNullable(this.proxySelector);
+    }
+
+    @Override
+    public WebSocket.Builder newWebSocketBuilder(URI uri,
+                                                 WebSocket.Listener listener) {
+        return new BuilderImpl(this, uri, listener);
+    }
+
+    @Override
+    public Version version() {
+        return version;
+    }
+
+    //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
+
+    boolean getHttp2Allowed() {
+        return version.equals(Version.HTTP_2);
+    }
+
+    private void initFilters() {
+        addFilter(AuthenticationFilter.class);
+        addFilter(RedirectFilter.class);
+        if (this.cookieManager != null) {
+            addFilter(CookieFilter.class);
+        }
+    }
+
+    private void addFilter(Class<? extends HeaderFilter> f) {
+        filters.addFilter(f);
+    }
+
+    final List<HeaderFilter> filterChain() {
+        return filters.getFilterChain();
+    }
+
+    // Timer controls.
+    // Timers are implemented through timed Selector.select() calls.
+
+    synchronized void registerTimer(TimeoutEvent event) {
+        Log.logTrace("Registering timer {0}", event);
+        timeouts.add(event);
+        selmgr.wakeupSelector();
+    }
+
+    synchronized void cancelTimer(TimeoutEvent event) {
+        Log.logTrace("Canceling timer {0}", event);
+        timeouts.remove(event);
+    }
+
+    /**
+     * Purges ( handles ) timer events that have passed their deadline, and
+     * returns the amount of time, in milliseconds, until the next earliest
+     * event. A return value of 0 means that there are no events.
+     */
+    private long purgeTimeoutsAndReturnNextDeadline() {
+        long diff = 0L;
+        List<TimeoutEvent> toHandle = null;
+        int remaining = 0;
+        // enter critical section to retrieve the timeout event to handle
+        synchronized(this) {
+            if (timeouts.isEmpty()) return 0L;
+
+            Instant now = Instant.now();
+            Iterator<TimeoutEvent> itr = timeouts.iterator();
+            while (itr.hasNext()) {
+                TimeoutEvent event = itr.next();
+                diff = now.until(event.deadline(), ChronoUnit.MILLIS);
+                if (diff <= 0) {
+                    itr.remove();
+                    toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
+                    toHandle.add(event);
+                } else {
+                    break;
+                }
+            }
+            remaining = timeouts.size();
+        }
+
+        // can be useful for debugging
+        if (toHandle != null && Log.trace()) {
+            Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
+                    + (toHandle == null ? 0 : toHandle.size()) + " events, "
+                    + "remaining " + remaining
+                    + ", next deadline: " + (diff < 0 ? 0L : diff));
+        }
+
+        // handle timeout events out of critical section
+        if (toHandle != null) {
+            Throwable failed = null;
+            for (TimeoutEvent event : toHandle) {
+                try {
+                   Log.logTrace("Firing timer {0}", event);
+                   event.handle();
+                } catch (Error | RuntimeException e) {
+                    // Not expected. Handle remaining events then throw...
+                    // If e is an OOME or SOE it might simply trigger a new
+                    // error from here - but in this case there's not much we
+                    // could do anyway. Just let it flow...
+                    if (failed == null) failed = e;
+                    else failed.addSuppressed(e);
+                    Log.logTrace("Failed to handle event {0}: {1}", event, e);
+                }
+            }
+            if (failed instanceof Error) throw (Error) failed;
+            if (failed instanceof RuntimeException) throw (RuntimeException) failed;
+        }
+
+        // return time to wait until next event. 0L if there's no more events.
+        return diff < 0 ? 0L : diff;
+    }
+
+    // used for the connection window
+    int getReceiveBufferSize() {
+        return Utils.getIntegerNetProperty(
+                "jdk.httpclient.connectionWindowSize", 256 * 1024
+        );
+    }
+}