src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
changeset 48083 b1c1b4ef4be2
child 55973 4d9b002587db
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Wed Dec 06 11:11:59 2017 -0800
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.common;
+
+import java.lang.System.Logger.Level;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
+
+/**
+ * An implementation of FlowTube that wraps another FlowTube in an
+ * SSL flow.
+ * <p>
+ * The following diagram shows a typical usage of the SSLTube, where
+ * the SSLTube wraps a SocketTube on the right hand side, and is connected
+ * to an HttpConnection on the left hand side.
+ *
+ * <preformatted>{@code
+ *                  +----------  SSLTube -------------------------+
+ *                  |                                             |
+ *                  |                    +---SSLFlowDelegate---+  |
+ *  HttpConnection  |                    |                     |  |   SocketTube
+ *    read sink  <- SSLSubscriberW.   <- Reader <- upstreamR.() <---- read source
+ *  (a subscriber)  |                    |    \         /      |  |  (a publisher)
+ *                  |                    |     SSLEngine       |  |
+ *  HttpConnection  |                    |    /         \      |  |   SocketTube
+ *  write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
+ *  (a publisher)   |                    |                     |  |  (a subscriber)
+ *                  |                    +---------------------+  |
+ *                  |                                             |
+ *                  +---------------------------------------------+
+ * }</preformatted>
+ */
+public class SSLTube implements FlowTube {
+
+    static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
+    final System.Logger debug =
+            Utils.getDebugLogger(this::dbgString, DEBUG);
+
+    private final FlowTube tube;
+    private final SSLSubscriberWrapper readSubscriber;
+    private final SSLSubscriptionWrapper writeSubscription;
+    private final SSLFlowDelegate sslDelegate;
+    private final SSLEngine engine;
+    private volatile boolean finished;
+
+    public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
+        Objects.requireNonNull(engine);
+        Objects.requireNonNull(executor);
+        this.tube = Objects.requireNonNull(tube);
+        writeSubscription = new SSLSubscriptionWrapper();
+        readSubscriber = new SSLSubscriberWrapper();
+        this.engine = engine;
+        sslDelegate = new SSLTubeFlowDelegate(engine,
+                                              executor,
+                                              readSubscriber,
+                                              tube);
+    }
+
+    final class SSLTubeFlowDelegate extends SSLFlowDelegate {
+        SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+                            SSLSubscriberWrapper readSubscriber,
+                            FlowTube tube) {
+            super(engine, executor, readSubscriber, tube);
+        }
+        protected SchedulingAction enterReadScheduling() {
+            readSubscriber.processPendingSubscriber();
+            return SchedulingAction.CONTINUE;
+        }
+        void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
+                     Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
+            assert downWriter == tube;
+            assert downReader == readSubscriber;
+
+            // Connect the read sink first. That's the left-hand side
+            // downstream subscriber from the HttpConnection (or more
+            // accurately, the SSLSubscriberWrapper that will wrap it
+            // when SSLTube::connectFlows is called.
+            reader.subscribe(downReader);
+
+            // Connect the right hand side tube (the socket tube).
+            //
+            // The SSLFlowDelegate.writer publishes ByteBuffer to
+            // the SocketTube for writing on the socket, and the
+            // SSLFlowDelegate::upstreamReader subscribes to the
+            // SocketTube to receive ByteBuffers read from the socket.
+            //
+            // Basically this method is equivalent to:
+            //     // connect the read source:
+            //     //   subscribe the SSLFlowDelegate upstream reader
+            //     //   to the socket tube publisher.
+            //     tube.subscribe(upstreamReader());
+            //     // connect the write sink:
+            //     //   subscribe the socket tube write subscriber
+            //     //   with the SSLFlowDelegate downstream writer.
+            //     writer.subscribe(tube);
+            tube.connectFlows(FlowTube.asTubePublisher(writer),
+                              FlowTube.asTubeSubscriber(upstreamReader()));
+
+            // Finally connect the write source. That's the left
+            // hand side publisher which will push ByteBuffer for
+            // writing and encryption to the SSLFlowDelegate.
+            // The writeSubscription is in fact the SSLSubscriptionWrapper
+            // that will wrap the subscription provided by the
+            // HttpConnection publisher when SSLTube::connectFlows
+            // is called.
+            upstreamWriter().onSubscribe(writeSubscription);
+        }
+    }
+
+    public CompletableFuture<String> getALPN() {
+        return sslDelegate.alpn();
+    }
+
+    @Override
+    public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
+        readSubscriber.dropSubscription();
+        readSubscriber.setDelegate(s);
+        s.onSubscribe(readSubscription);
+    }
+
+    /**
+     * Tells whether, or not, this FlowTube has finished receiving data.
+     *
+     * @return true when one of this FlowTube Subscriber's OnError or onComplete
+     * methods have been invoked
+     */
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    private volatile Flow.Subscription readSubscription;
+
+    // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
+    // tracks the subscriber's state. In particular it makes sure that
+    // onComplete/onError are not called before onSubscribed.
+    final static class DelegateWrapper implements FlowTube.TubeSubscriber {
+        private final FlowTube.TubeSubscriber delegate;
+        private final System.Logger debug;
+        volatile boolean subscribedCalled;
+        volatile boolean subscribedDone;
+        volatile boolean completed;
+        volatile Throwable error;
+        DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
+                        System.Logger debug) {
+            this.delegate = FlowTube.asTubeSubscriber(delegate);
+            this.debug = debug;
+        }
+
+        @Override
+        public void dropSubscription() {
+            if (subscribedCalled && !completed) {
+                delegate.dropSubscription();
+            }
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            assert subscribedCalled;
+            delegate.onNext(item);
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            onSubscribe(delegate::onSubscribe, subscription);
+        }
+
+        private void onSubscribe(Consumer<Flow.Subscription> method,
+                                 Flow.Subscription subscription) {
+            subscribedCalled = true;
+            method.accept(subscription);
+            Throwable x;
+            boolean finished;
+            synchronized (this) {
+                subscribedDone = true;
+                x = error;
+                finished = completed;
+            }
+            if (x != null) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: forwarding %s",
+                          (Object)x);
+                delegate.onError(x);
+            } else if (finished) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: calling onComplete()");
+                delegate.onComplete();
+            }
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            if (completed) {
+                debug.log(Level.DEBUG,
+                          "Subscriber already completed: ignoring %s",
+                          (Object)t);
+                return;
+            }
+            boolean subscribed;
+            synchronized (this) {
+                if (completed) return;
+                error = t;
+                completed = true;
+                subscribed = subscribedDone;
+            }
+            if (subscribed) {
+                delegate.onError(t);
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored %s",
+                          (Object)t);
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            if (completed) return;
+            boolean subscribed;
+            synchronized (this) {
+                if (completed) return;
+                completed = true;
+                subscribed = subscribedDone;
+            }
+            if (subscribed) {
+                debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
+                delegate.onComplete();
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored completed=true");
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "DelegateWrapper:" + delegate.toString();
+        }
+
+    }
+
+    // Used to read data from the SSLTube.
+    final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
+        private AtomicReference<DelegateWrapper> pendingDelegate =
+                new AtomicReference<>();
+        private volatile DelegateWrapper subscribed;
+        private volatile boolean onCompleteReceived;
+        private final AtomicReference<Throwable> errorRef
+                = new AtomicReference<>();
+
+        // setDelegate can be called asynchronously when the SSLTube flow
+        // is connected. At this time the permanent subscriber (this class)
+        // may already be subscribed (readSubscription != null) or not.
+        // 1. If it's already subscribed (readSubscription != null), we
+        //    are going to signal the SSLFlowDelegate reader, and make sure
+        //    onSubscribed is called within the reader flow
+        // 2. If it's not yet subscribed (readSubscription == null), then
+        //    we're going to wait for onSubscribe to be called.
+        //
+        void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
+            debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
+                      delegate);
+            assert delegate != null;
+            DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
+            DelegateWrapper previous;
+            Flow.Subscription subscription;
+            boolean handleNow;
+            synchronized (this) {
+                previous = pendingDelegate.getAndSet(delegateWrapper);
+                subscription = readSubscription;
+                handleNow = this.errorRef.get() != null || finished;
+            }
+            if (previous != null) {
+                previous.dropSubscription();
+            }
+            if (subscription == null) {
+                debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
+                return;
+            }
+            if (handleNow || !sslDelegate.resumeReader()) {
+                processPendingSubscriber();
+            }
+        }
+
+        // Can be called outside of the flow if an error has already been
+        // raise. Otherwise, must be called within the SSLFlowDelegate
+        // downstream reader flow.
+        // If there is a subscription, and if there is a pending delegate,
+        // calls dropSubscription() on the previous delegate (if any),
+        // then subscribe the pending delegate.
+        void processPendingSubscriber() {
+            Flow.Subscription subscription;
+            DelegateWrapper delegateWrapper, previous;
+            synchronized (this) {
+                delegateWrapper = pendingDelegate.get();
+                if (delegateWrapper == null) return;
+                subscription = readSubscription;
+                previous = subscribed;
+            }
+            if (subscription == null) {
+                debug.log(Level.DEBUG,
+                         "SSLSubscriberWrapper (reader) %s",
+                         "processPendingSubscriber: no subscription yet");
+                return;
+            }
+            delegateWrapper = pendingDelegate.getAndSet(null);
+            if (delegateWrapper == null) return;
+            if (previous != null) {
+                previous.dropSubscription();
+            }
+            onNewSubscription(delegateWrapper, subscription);
+        }
+
+        @Override
+        public void dropSubscription() {
+            DelegateWrapper subscriberImpl = subscribed;
+            if (subscriberImpl != null) {
+                subscriberImpl.dropSubscription();
+            }
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribe(%s)",
+                      subscription);
+            onSubscribeImpl(subscription);
+        }
+
+        // called in the reader flow, from onSubscribe.
+        private void onSubscribeImpl(Flow.Subscription subscription) {
+            assert subscription != null;
+            DelegateWrapper subscriberImpl, pending;
+            synchronized (this) {
+                readSubscription = subscription;
+                subscriberImpl = subscribed;
+                pending = pendingDelegate.get();
+            }
+
+            if (subscriberImpl == null && pending == null) {
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "no delegate yet");
+                return;
+            }
+
+            if (pending == null) {
+                // There is no pending delegate, but we have a previously
+                // subscribed delegate. This is obviously a re-subscribe.
+                // We are in the downstream reader flow, so we should call
+                // onSubscribe directly.
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "resubscribing");
+                onNewSubscription(subscriberImpl, subscription);
+            } else {
+                // We have some pending subscriber: subscribe it now that we have
+                // a subscription. If we already had a previous delegate then
+                // it will get a dropSubscription().
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "subscribing pending");
+                processPendingSubscriber();
+            }
+        }
+
+        private void onNewSubscription(DelegateWrapper subscriberImpl,
+                                       Flow.Subscription subscription) {
+            assert subscriberImpl != null;
+            assert subscription != null;
+
+            Throwable failed;
+            boolean completed;
+            // reset any demand that may have been made by the previous
+            // subscriber
+            sslDelegate.resetReaderDemand();
+            // send the subscription to the subscriber.
+            subscriberImpl.onSubscribe(subscription);
+
+            // The following twisted logic is just here that we don't invoke
+            // onError before onSubscribe. It also prevents race conditions
+            // if onError is invoked concurrently with setDelegate.
+            synchronized (this) {
+                failed = this.errorRef.get();
+                completed = finished;
+                subscribed = subscriberImpl;
+            }
+            if (failed != null) {
+                subscriberImpl.onError(failed);
+            } else if (completed) {
+                subscriberImpl.onComplete();
+            }
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            subscribed.onNext(item);
+        }
+
+        public void onErrorImpl(Throwable throwable) {
+            // The following twisted logic is just here that we don't invoke
+            // onError before onSubscribe. It also prevents race conditions
+            // if onError is invoked concurrently with setDelegate.
+            // See setDelegate.
+
+            errorRef.compareAndSet(null, throwable);
+            Throwable failed = errorRef.get();
+            finished = true;
+            debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable);
+            DelegateWrapper subscriberImpl;
+            synchronized (this) {
+                subscriberImpl = subscribed;
+            }
+            if (subscriberImpl != null) {
+                subscriberImpl.onError(failed);
+            } else {
+                debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
+            }
+            // now if we have any pending subscriber, we should forward
+            // the error to them immediately as the read scheduler will
+            // already be stopped.
+            processPendingSubscriber();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            assert !finished && !onCompleteReceived;
+            onErrorImpl(throwable);
+        }
+
+        private boolean handshaking() {
+            HandshakeStatus hs = engine.getHandshakeStatus();
+            return !(hs == NOT_HANDSHAKING || hs == FINISHED);
+        }
+
+        private boolean handshakeFailed() {
+            // sslDelegate can be null if we reach here
+            // during the initial handshake, as that happens
+            // within the SSLFlowDelegate constructor.
+            // In that case we will want to raise an exception.
+            return handshaking()
+                    && (sslDelegate == null
+                    || !sslDelegate.closeNotifyReceived());
+        }
+
+        @Override
+        public void onComplete() {
+            assert !finished && !onCompleteReceived;
+            onCompleteReceived = true;
+            DelegateWrapper subscriberImpl;
+            synchronized(this) {
+                subscriberImpl = subscribed;
+            }
+
+            if (handshakeFailed()) {
+                debug.log(Level.DEBUG,
+                        "handshake: %s, inbound done: %s outbound done: %s",
+                        engine.getHandshakeStatus(),
+                        engine.isInboundDone(),
+                        engine.isOutboundDone());
+                onErrorImpl(new SSLHandshakeException(
+                        "Remote host terminated the handshake"));
+            } else if (subscriberImpl != null) {
+                finished = true;
+                subscriberImpl.onComplete();
+            }
+            // now if we have any pending subscriber, we should complete
+            // them immediately as the read scheduler will already be stopped.
+            processPendingSubscriber();
+        }
+    }
+
+    @Override
+    public void connectFlows(TubePublisher writePub,
+                             TubeSubscriber readSub) {
+        debug.log(Level.DEBUG, "connecting flows");
+        readSubscriber.setDelegate(readSub);
+        writePub.subscribe(this);
+    }
+
+    /** Outstanding write demand from the SSL Flow Delegate. */
+    private final Demand writeDemand = new Demand();
+
+    final class SSLSubscriptionWrapper implements Flow.Subscription {
+
+        volatile Flow.Subscription delegate;
+
+        void setSubscription(Flow.Subscription sub) {
+            long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
+            delegate = sub;
+            debug.log(Level.DEBUG, "setSubscription: demand=%d", demand);
+            if (demand > 0)
+                sub.request(demand);
+        }
+
+        @Override
+        public void request(long n) {
+            writeDemand.increase(n);
+            debug.log(Level.DEBUG, "request: n=%d", n);
+            Flow.Subscription sub = delegate;
+            if (sub != null && n > 0) {
+                sub.request(n);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            // TODO:  no-op or error?
+        }
+    }
+
+    /* Subscriber - writing side */
+    @Override
+    public void onSubscribe(Flow.Subscription subscription) {
+        Objects.requireNonNull(subscription);
+        Flow.Subscription x = writeSubscription.delegate;
+        if (x != null)
+            x.cancel();
+
+        writeSubscription.setSubscription(subscription);
+    }
+
+    @Override
+    public void onNext(List<ByteBuffer> item) {
+        Objects.requireNonNull(item);
+        boolean decremented = writeDemand.tryDecrement();
+        assert decremented : "Unexpected writeDemand: ";
+        debug.log(Level.DEBUG,
+                "sending %d  buffers to SSL flow delegate", item.size());
+        sslDelegate.upstreamWriter().onNext(item);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        Objects.requireNonNull(throwable);
+        sslDelegate.upstreamWriter().onError(throwable);
+    }
+
+    @Override
+    public void onComplete() {
+        sslDelegate.upstreamWriter().onComplete();
+    }
+
+    @Override
+    public String toString() {
+        return dbgString();
+    }
+
+    final String dbgString() {
+        return "SSLTube(" + tube + ")";
+    }
+
+}