src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
branchhttp-client-branch
changeset 55768 8674257c75ce
parent 55763 634d8e14c172
child 55792 0936888d5a4a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Mon Nov 06 18:17:09 2017 +0000
@@ -37,6 +37,7 @@
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 
@@ -60,14 +61,28 @@
         writeSubscription = new SSLSubscriptionWrapper();
         readSubscriber = new SSLSubscriberWrapper();
         this.engine = engine;
-        sslDelegate = new SSLFlowDelegate(engine,
+        sslDelegate = new SSLTubeFlowDelegate(engine,
                                           executor,
                                           readSubscriber,
-                                          tube); // FIXME
+                                          tube);
         tube.subscribe(sslDelegate.upstreamReader());
         sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
     }
 
+    // the other possibility would be to pass a lambda to the
+    // constructor of SSLFlowDelegate (instead of subclassing it).
+    final class SSLTubeFlowDelegate extends SSLFlowDelegate {
+        SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+                            SSLSubscriberWrapper readSubscriber,
+                            FlowTube tube) {
+            super(engine, executor, readSubscriber, tube);
+        }
+        protected SchedulingAction enterReadScheduling() {
+            readSubscriber.processPendingSubscriber();
+            return SchedulingAction.CONTINUE;
+        }
+    }
+
     public CompletableFuture<String> getALPN() {
         return sslDelegate.alpn();
     }
@@ -97,17 +112,20 @@
     // onComplete/onError are not called before onSubscribed.
     final static class DelegateWrapper implements FlowTube.TubeSubscriber {
         private final FlowTube.TubeSubscriber delegate;
+        private final System.Logger debug;
         volatile boolean subscribedCalled;
         volatile boolean subscribedDone;
         volatile boolean completed;
         volatile Throwable error;
-        DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
+        DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
+                        System.Logger debug) {
             this.delegate = FlowTube.asTubeSubscriber(delegate);
+            this.debug = debug;
         }
 
         @Override
         public void dropSubscription() {
-            if (subscribedCalled) {
+            if (subscribedCalled && !completed) {
                 delegate.dropSubscription();
             }
         }
@@ -140,15 +158,25 @@
                 finished = completed;
             }
             if (x != null) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: forwarding %s",
+                          (Object)x);
                 delegate.onError(x);
             } else if (finished) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: calling onComplete()");
                 delegate.onComplete();
             }
         }
 
         @Override
         public void onError(Throwable t) {
-            if (completed) return;
+            if (completed) {
+                debug.log(Level.DEBUG,
+                          "Subscriber already completed: ignoring %s",
+                          (Object)t);
+                return;
+            }
             boolean subscribed;
             synchronized (this) {
                 if (completed) return;
@@ -158,6 +186,10 @@
             }
             if (subscribed) {
                 delegate.onError(t);
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored %s",
+                          (Object)t);
             }
         }
 
@@ -172,6 +204,9 @@
             }
             if (subscribed) {
                 delegate.onComplete();
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored completed=true");
             }
         }
 
@@ -184,27 +219,73 @@
 
     // Used to read data from the SSLTube.
     final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
-        private volatile DelegateWrapper delegate;
+        private AtomicReference<DelegateWrapper> pendingDelegate =
+                new AtomicReference<>();
         private volatile DelegateWrapper subscribed;
         private volatile boolean onCompleteReceived;
         private final AtomicReference<Throwable> errorRef
                 = new AtomicReference<>();
 
+        // setDelegate can be called asynchronously when the SSLTube flow
+        // is connected. At this time the permanent subscriber (this class)
+        // may already be subscribed (readSubscription != null) or not.
+        // 1. If it's already subscribed (readSubscription != null), we
+        //    are going to signal the SSLFlowDelegate reader, and make sure
+        //    onSubscribed is called within the reader flow
+        // 2. If it's not yet subscribed (readSubscription == null), then
+        //    we're going to wait for onSubscribe/onConnection to be called.
+        //
         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
                       delegate);
             assert delegate != null;
-            DelegateWrapper delegateWrapper = new DelegateWrapper(delegate);
+            DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
+            DelegateWrapper previous;
             Flow.Subscription subscription;
+            boolean handleNow;
             synchronized (this) {
-                this.delegate = delegateWrapper;
+                previous = pendingDelegate.getAndSet(delegateWrapper);
                 subscription = readSubscription;
+                handleNow = this.errorRef.get() != null || finished;
+            }
+            if (previous != null) {
+                previous.dropSubscription();
             }
             if (subscription == null) {
                 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
                 return;
             }
+            if (handleNow || !sslDelegate.resumeReader()) {
+                processPendingSubscriber();
+            }
+        }
 
+        // Can be called outside of the flow if an error has already been
+        // raise. Otherwise, must be called within the SSLFlowDelegate
+        // downstream reader flow.
+        // If there is a subscription, and if there is a pending delegate,
+        // calls dropSubscription() on the previous delegate (if any),
+        // then subscribe the pending delegate.
+        void processPendingSubscriber() {
+            Flow.Subscription subscription;
+            DelegateWrapper delegateWrapper, previous;
+            synchronized (this) {
+                delegateWrapper = pendingDelegate.get();
+                if (delegateWrapper == null) return;
+                subscription = readSubscription;
+                previous = subscribed;
+            }
+            if (subscription == null) {
+                debug.log(Level.DEBUG,
+                         "SSLSubscriberWrapper (reader) %s",
+                         "processPendingSubscriber: no subscription yet");
+                return;
+            }
+            delegateWrapper = pendingDelegate.getAndSet(null);
+            if (delegateWrapper == null) return;
+            if (previous != null) {
+                previous.dropSubscription();
+            }
             onNewSubscription(delegateWrapper,
                               delegateWrapper::onSubscribe,
                               subscription);
@@ -212,7 +293,7 @@
 
         @Override
         public void dropSubscription() {
-            DelegateWrapper subscriberImpl = delegate;
+            DelegateWrapper subscriberImpl = subscribed;
             if (subscriberImpl != null) {
                 subscriberImpl.dropSubscription();
             }
@@ -223,20 +304,7 @@
             debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onConnection(%s)",
                       subscription);
-            assert subscription != null;
-            DelegateWrapper subscriberImpl;
-            synchronized (this) {
-                subscriberImpl = delegate;
-                readSubscription = subscription;
-            }
-            if (subscriberImpl == null) {
-                debug.log(Level.DEBUG,
-                      "SSLSubscriberWrapper (reader) onConnection: no delegate yet");
-                return;
-            }
-            onNewSubscription(subscriberImpl,
-                              subscriberImpl::onConnection,
-                              subscription);
+            onSubscribeImpl(subscription);
         }
 
         @Override
@@ -244,21 +312,46 @@
             debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
                       subscription);
-            readSubscription = subscription;
+            onSubscribeImpl(subscription);
+        }
+
+        // called in the reader flow, from either onSubscribe or onConnection.
+        private void onSubscribeImpl(Flow.Subscription subscription) {
             assert subscription != null;
-            DelegateWrapper subscriberImpl;
+            DelegateWrapper subscriberImpl, pending;
             synchronized (this) {
-                subscriberImpl = delegate;
                 readSubscription = subscription;
+                subscriberImpl = subscribed;
+                pending = pendingDelegate.get();
             }
-            if (subscriberImpl == null) {
+
+            if (subscriberImpl == null && pending == null) {
                 debug.log(Level.DEBUG,
-                      "SSLSubscriberWrapper (reader) onSubscribe: no delegate yet");
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "no delegate yet");
                 return;
             }
-            onNewSubscription(subscriberImpl,
-                              subscriberImpl::onSubscribe,
-                              subscription);
+
+            if (pending == null) {
+                // There is no pending delegate, but we have a previously
+                // subscribed delegate. This is obviously a re-subscribe.
+                // We are in the downstream reader flow, so we should call
+                // onConnection directly.
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "resusbcribing");
+                onNewSubscription(subscriberImpl,
+                                  subscriberImpl::onConnection,
+                                  subscription);
+            } else {
+                // We have some pending subscriber: subscribe it now that we have
+                // a subscription. If we already had a previous delegate then
+                // it will get a dropSubscription().
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "subscribing pending");
+                processPendingSubscriber();
+            }
         }
 
         private void onNewSubscription(DelegateWrapper subscriberImpl,
@@ -275,11 +368,6 @@
             sslDelegate.resetReaderDemand();
             // send the subscription to the subscriber.
             method.accept(subscription);
-            // reschedule after calling onSubscribe (this should not be
-            // strictly needed as the first call to subscription.request()
-            // coming after resetting the demand should trigger it).
-            // However, it should not do any harm.
-            sslDelegate.resumeReader();
 
             // The following twisted logic is just here that we don't invoke
             // onError before onSubscribe. It also prevents race conditions
@@ -287,9 +375,7 @@
             synchronized (this) {
                 failed = this.errorRef.get();
                 completed = finished;
-                if (delegate == subscriberImpl) {
-                    subscribed = subscriberImpl;
-                }
+                subscribed = subscriberImpl;
             }
             if (failed != null) {
                 subscriberImpl.onError(failed);
@@ -300,7 +386,7 @@
 
         @Override
         public void onNext(List<ByteBuffer> item) {
-            delegate.onNext(item);
+            subscribed.onNext(item);
         }
 
         public void onErrorImpl(Throwable throwable) {
@@ -322,6 +408,10 @@
             } else {
                 debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
             }
+            // now if we have any pending subscriber, we should forward
+            // the error to them immediately as the read scheduler will
+            // already be stopped.
+            processPendingSubscriber();
         }
 
         @Override
@@ -351,6 +441,9 @@
                 finished = true;
                 subscriberImpl.onComplete();
             }
+            // now if we have any pending subscriber, we should complete
+            // them immediately as the read scheduler will already be stopped.
+            processPendingSubscriber();
         }
     }