http-client-branch: SSLTube should call onSubscribe in the flow http-client-branch
authordfuchs
Mon, 06 Nov 2017 18:17:09 +0000
branchhttp-client-branch
changeset 55768 8674257c75ce
parent 55766 2136ad3694e4
child 55770 a7fbe5d26f3c
http-client-branch: SSLTube should call onSubscribe in the flow
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java	Mon Nov 06 18:17:09 2017 +0000
@@ -44,7 +44,6 @@
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
 import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
 import jdk.incubator.http.internal.common.ConnectionExpiredException;
 import jdk.incubator.http.internal.common.Utils;
 
@@ -146,7 +145,7 @@
     private final ConcurrentLinkedDeque<ByteBuffer> queue
             = new ConcurrentLinkedDeque<>();
     private final SequentialScheduler scheduler =
-            new SequentialScheduler(new SynchronizedRestartableTask(this::flush));
+            SequentialScheduler.synchronizedScheduler(this::flush);
     private final Executor executor;
     private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
     private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Mon Nov 06 18:17:09 2017 +0000
@@ -42,7 +42,7 @@
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
+import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
 
@@ -83,9 +83,9 @@
     private final Http1Publisher writePublisher = new Http1Publisher();
 
     /** Completed when the header have been published, or there is an error */
-    private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF  = new CompletableFuture<>();
+    private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
      /** Completed when the body has been published, or there is an error */
-    private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new CompletableFuture<>();
+    private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
 
     /** The subscriber to the request's body published. Maybe null. */
     private volatile Http1BodySubscriber bodySubscriber;
@@ -238,13 +238,13 @@
                 operations.add(connectCF);
             }
         } else {
-            connectCF = new CompletableFuture<>();
+            connectCF = new MinimalFuture<>();
             connectCF.complete(null);
         }
 
         return connectCF
                 .thenCompose(unused -> {
-                    CompletableFuture<Void> cf = new CompletableFuture<>();
+                    CompletableFuture<Void> cf = new MinimalFuture<>();
                     try {
                         connectFlows(connection);
 
@@ -519,8 +519,8 @@
         volatile boolean cancelled;
         final Http1WriteSubscription subscription = new Http1WriteSubscription();
         final Demand demand = new Demand();
-        final SequentialScheduler writeScheduler = new SequentialScheduler(
-                new SynchronizedRestartableTask(new WriteTask()));
+        final SequentialScheduler writeScheduler =
+                SequentialScheduler.synchronizedScheduler(new WriteTask());
 
         @Override
         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Mon Nov 06 18:17:09 2017 +0000
@@ -49,7 +49,6 @@
 import javax.net.ssl.SSLEngine;
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.Encoder;
@@ -154,8 +153,8 @@
             }
 
             // Preface is sent. Checks for pending data and flush it.
-            // We rely on this method being called from within the readlock,
-            // so we know that no other thread could execute this method
+            // We rely on this method being called from within the Http2TubeSubscriber
+            // scheduler, so we know that no other thread could execute this method
             // concurrently while we're here.
             // This ensures that later incoming buffers will not
             // be processed before we have flushed the pending queue.
@@ -275,7 +274,9 @@
         sendConnectionPreface();
     }
 
-    // async style but completes immediately
+    // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
+    // agreement from the server. Async style but completes immediately, because
+    // the connection is already connected.
     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
                                                           Http2ClientImpl client2,
                                                           Exchange<?> exchange,
@@ -297,7 +298,7 @@
         return connection.connectAsync()
                   .thenCompose(unused -> checkSSLConfig(connection))
                   .thenCompose(notused-> {
-                      CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+                      CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
                       try {
                           Http2Connection hc = new Http2Connection(request, h2client, connection);
                           cf.complete(hc);
@@ -490,10 +491,8 @@
         return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
     }
 
-    private final Object readlock = new Object();
-
     long count;
-    public final void asyncReceive(ByteBufferReference buffer) {
+    final void asyncReceive(ByteBufferReference buffer) {
         // We don't need to read anything and
         // we don't want to send anything back to the server
         // until the connection preface has been sent.
@@ -505,48 +504,43 @@
         // sending a GOAWAY frame with 'invalid_preface'.
         //
         // Note: asyncReceive is only called from the Http2TubeSubscriber
-        //       sequential scheduler. Only asyncReceive uses the readLock.
-        //       Therefore synchronizing on the readlock here should be
-        //       safe.
-        //
-        synchronized (readlock) {
-            try {
-                Supplier<ByteBuffer> bs = initial;
-                // ensure that we always handle the initial buffer first,
-                // if any.
-                if (bs != null) {
-                    initial = null;
-                    ByteBuffer b = bs.get();
-                    if (b.hasRemaining()) {
-                        long c = ++count;
-                        debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
-                            + c +"): " + b.remaining());
-                        framesController.processReceivedData(framesDecoder,
-                                ByteBufferReference.of(b));
-                    }
+        //       sequential scheduler.
+        try {
+            Supplier<ByteBuffer> bs = initial;
+            // ensure that we always handle the initial buffer first,
+            // if any.
+            if (bs != null) {
+                initial = null;
+                ByteBuffer b = bs.get();
+                if (b.hasRemaining()) {
+                    long c = ++count;
+                    debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
+                        + c +"): " + b.remaining());
+                    framesController.processReceivedData(framesDecoder,
+                            ByteBufferReference.of(b));
                 }
-                ByteBuffer b = buffer.get();
-                // the readlock ensures that the order of incoming buffers
-                // is preserved.
-                if (b == EMPTY_TRIGGER) {
-                    debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
-                    boolean prefaceSent = framesController.prefaceSent;
-                    assert prefaceSent;
-                    // call framesController.processReceivedData to potentially
-                    // trigger the processing of all the data buffered there.
-                    framesController.processReceivedData(framesDecoder, buffer);
-                    debug.log(Level.DEBUG, "H2 processed buffered data");
-                } else {
-                    long c = ++count;
-                    debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
-                    framesController.processReceivedData(framesDecoder, buffer);
-                    debug.log(Level.DEBUG, "H2 processed(%d)", c);
-                }
-            } catch (Throwable e) {
-                String msg = Utils.stackTrace(e);
-                Log.logTrace(msg);
-                shutdown(e);
             }
+            ByteBuffer b = buffer.get();
+            // the Http2TubeSubscriber scheduler ensures that the order of incoming
+            // buffers is preserved.
+            if (b == EMPTY_TRIGGER) {
+                debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
+                boolean prefaceSent = framesController.prefaceSent;
+                assert prefaceSent;
+                // call framesController.processReceivedData to potentially
+                // trigger the processing of all the data buffered there.
+                framesController.processReceivedData(framesDecoder, buffer);
+                debug.log(Level.DEBUG, "H2 processed buffered data");
+            } else {
+                long c = ++count;
+                debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
+                framesController.processReceivedData(framesDecoder, buffer);
+                debug.log(Level.DEBUG, "H2 processed(%d)", c);
+            }
+        } catch (Throwable e) {
+            String msg = Utils.stackTrace(e);
+            Log.logTrace(msg);
+            shutdown(e);
         }
     }
 
@@ -555,7 +549,7 @@
     }
 
     void shutdown(Throwable t) {
-        debug.log(Level.DEBUG, () -> "Shutting down h2c: " + t);
+        debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
         if (closed == true) return;
         synchronized (this) {
             if (closed == true) return;
@@ -1060,8 +1054,8 @@
         volatile Throwable error;
         final ConcurrentLinkedQueue<ByteBuffer> queue
                 = new ConcurrentLinkedQueue<>();
-        final SequentialScheduler scheduler = new SequentialScheduler(
-                        new SynchronizedRestartableTask(this::processQueue));
+        final SequentialScheduler scheduler =
+                SequentialScheduler.synchronizedScheduler(this::processQueue);
 
         final void processQueue() {
             try {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Nov 06 18:17:09 2017 +0000
@@ -44,7 +44,6 @@
 
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
 import static java.util.stream.Collectors.toList;
@@ -99,7 +98,7 @@
 
     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
     final SequentialScheduler sched =
-            new SequentialScheduler(new SynchronizedRestartableTask(this::schedule));
+            SequentialScheduler.synchronizedScheduler(this::schedule);
     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
 
     /**
@@ -124,7 +123,6 @@
     volatile RequestSubscriber requestSubscriber;
     volatile int responseCode;
     volatile Response response;
-    volatile CompletableFuture<Response> responseCF;
     volatile Throwable failed; // The exception with which this stream was canceled.
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
     volatile CompletableFuture<T> responseBodyCF;
@@ -563,7 +561,7 @@
         }
         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
         connection.sendFrame(f);
-        CompletableFuture<ExchangeImpl<T>> cf = new CompletableFuture<ExchangeImpl<T>>();
+        CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
         cf.complete(this);  // #### good enough for now
         return cf;
     }
@@ -616,8 +614,8 @@
         RequestSubscriber(long contentLen) {
             this.contentLength = contentLen;
             this.remainingContentLength = contentLen;
-            this.sendScheduler = new SequentialScheduler(
-                    new SynchronizedRestartableTask(this::trySend));
+            this.sendScheduler =
+                    SequentialScheduler.synchronizedScheduler(this::trySend);
         }
 
         @Override
@@ -900,6 +898,7 @@
 
     // This method sends a RST_STREAM frame
     void cancelImpl(Throwable e) {
+        debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
         if (Log.trace()) {
             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
         }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Mon Nov 06 18:17:09 2017 +0000
@@ -44,6 +44,7 @@
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -112,7 +113,7 @@
         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
         this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
                                    .thenRun(this::normalStop);
-        this.alpnCF = new CompletableFuture<>();
+        this.alpnCF = new MinimalFuture<>();
         //Monitor.add(this::monitor);
     }
 
@@ -152,6 +153,11 @@
         return sb.toString();
     }
 
+    protected SchedulingAction enterReadScheduling() {
+        return SchedulingAction.CONTINUE;
+    }
+
+
     /**
      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
      * Any decrypted buffers returned to be passed downstream.
@@ -172,21 +178,26 @@
         static final int TARGET_BUFSIZE = 16 * 1024;
         volatile ByteBuffer readBuf;
         volatile boolean completing = false;
-        final Object readLock = new Object();
+        final Object readBufferLock = new Object();
         final System.Logger debugr =
             Utils.getDebugLogger(this::dbgString, DEBUG);
 
-        class ReaderDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
+        class ReaderDownstreamPusher implements Runnable {
             @Override public void run() { processData(); }
         }
 
         Reader() {
             super();
-            scheduler = new SequentialScheduler(new ReaderDownstreamPusher());
+            scheduler = SequentialScheduler.synchronizedScheduler(
+                                                new ReaderDownstreamPusher());
             this.readBuf = ByteBuffer.allocate(1024);
             readBuf.limit(0); // keep in read mode
         }
 
+        protected SchedulingAction enterScheduling() {
+            return enterReadScheduling();
+        }
+
         public final String dbgString() {
             return "SSL Reader(" + tubeName + ")";
         }
@@ -230,7 +241,7 @@
 
         // readBuf is kept ready for reading outside of this method
         private void addToReadBuf(List<ByteBuffer> buffers) {
-            synchronized (readLock) {
+            synchronized (readBufferLock) {
                 for (ByteBuffer buf : buffers) {
                     readBuf.compact();
                     while (readBuf.remaining() < buf.remaining())
@@ -263,7 +274,7 @@
                     boolean handshaking = false;
                     try {
                         EngineResult result;
-                        synchronized (readLock) {
+                        synchronized (readBufferLock) {
                             result = unwrapBuffer(readBuf);
                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
                         }
@@ -314,6 +325,7 @@
             }
         }
     }
+
     /**
      * Returns a CompletableFuture which completes after all activity
      * in the delegate is terminated (whether normally or exceptionally).
@@ -324,16 +336,6 @@
         return cf;
     }
 
-    private String xxx(List<ByteBuffer> i) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("xxx size=" + i.size());
-        int x = 0;
-        for (ByteBuffer b : i)
-            x += b.remaining();
-        sb.append(" total " + x);
-        return sb.toString();
-    }
-
     public interface Monitorable {
         public String getInfo();
     }
@@ -706,8 +708,8 @@
         return writer;
     }
 
-    public void resumeReader() {
-        reader.schedule();
+    public boolean resumeReader() {
+        return reader.signalScheduling();
     }
 
     public void resetReaderDemand() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Mon Nov 06 18:17:09 2017 +0000
@@ -37,6 +37,7 @@
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 
@@ -60,14 +61,28 @@
         writeSubscription = new SSLSubscriptionWrapper();
         readSubscriber = new SSLSubscriberWrapper();
         this.engine = engine;
-        sslDelegate = new SSLFlowDelegate(engine,
+        sslDelegate = new SSLTubeFlowDelegate(engine,
                                           executor,
                                           readSubscriber,
-                                          tube); // FIXME
+                                          tube);
         tube.subscribe(sslDelegate.upstreamReader());
         sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
     }
 
+    // the other possibility would be to pass a lambda to the
+    // constructor of SSLFlowDelegate (instead of subclassing it).
+    final class SSLTubeFlowDelegate extends SSLFlowDelegate {
+        SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+                            SSLSubscriberWrapper readSubscriber,
+                            FlowTube tube) {
+            super(engine, executor, readSubscriber, tube);
+        }
+        protected SchedulingAction enterReadScheduling() {
+            readSubscriber.processPendingSubscriber();
+            return SchedulingAction.CONTINUE;
+        }
+    }
+
     public CompletableFuture<String> getALPN() {
         return sslDelegate.alpn();
     }
@@ -97,17 +112,20 @@
     // onComplete/onError are not called before onSubscribed.
     final static class DelegateWrapper implements FlowTube.TubeSubscriber {
         private final FlowTube.TubeSubscriber delegate;
+        private final System.Logger debug;
         volatile boolean subscribedCalled;
         volatile boolean subscribedDone;
         volatile boolean completed;
         volatile Throwable error;
-        DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
+        DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
+                        System.Logger debug) {
             this.delegate = FlowTube.asTubeSubscriber(delegate);
+            this.debug = debug;
         }
 
         @Override
         public void dropSubscription() {
-            if (subscribedCalled) {
+            if (subscribedCalled && !completed) {
                 delegate.dropSubscription();
             }
         }
@@ -140,15 +158,25 @@
                 finished = completed;
             }
             if (x != null) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: forwarding %s",
+                          (Object)x);
                 delegate.onError(x);
             } else if (finished) {
+                debug.log(Level.DEBUG,
+                          "Subscriber completed before subscribe: calling onComplete()");
                 delegate.onComplete();
             }
         }
 
         @Override
         public void onError(Throwable t) {
-            if (completed) return;
+            if (completed) {
+                debug.log(Level.DEBUG,
+                          "Subscriber already completed: ignoring %s",
+                          (Object)t);
+                return;
+            }
             boolean subscribed;
             synchronized (this) {
                 if (completed) return;
@@ -158,6 +186,10 @@
             }
             if (subscribed) {
                 delegate.onError(t);
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored %s",
+                          (Object)t);
             }
         }
 
@@ -172,6 +204,9 @@
             }
             if (subscribed) {
                 delegate.onComplete();
+            } else {
+                debug.log(Level.DEBUG,
+                          "Subscriber not yet subscribed: stored completed=true");
             }
         }
 
@@ -184,27 +219,73 @@
 
     // Used to read data from the SSLTube.
     final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
-        private volatile DelegateWrapper delegate;
+        private AtomicReference<DelegateWrapper> pendingDelegate =
+                new AtomicReference<>();
         private volatile DelegateWrapper subscribed;
         private volatile boolean onCompleteReceived;
         private final AtomicReference<Throwable> errorRef
                 = new AtomicReference<>();
 
+        // setDelegate can be called asynchronously when the SSLTube flow
+        // is connected. At this time the permanent subscriber (this class)
+        // may already be subscribed (readSubscription != null) or not.
+        // 1. If it's already subscribed (readSubscription != null), we
+        //    are going to signal the SSLFlowDelegate reader, and make sure
+        //    onSubscribed is called within the reader flow
+        // 2. If it's not yet subscribed (readSubscription == null), then
+        //    we're going to wait for onSubscribe/onConnection to be called.
+        //
         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
                       delegate);
             assert delegate != null;
-            DelegateWrapper delegateWrapper = new DelegateWrapper(delegate);
+            DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
+            DelegateWrapper previous;
             Flow.Subscription subscription;
+            boolean handleNow;
             synchronized (this) {
-                this.delegate = delegateWrapper;
+                previous = pendingDelegate.getAndSet(delegateWrapper);
                 subscription = readSubscription;
+                handleNow = this.errorRef.get() != null || finished;
+            }
+            if (previous != null) {
+                previous.dropSubscription();
             }
             if (subscription == null) {
                 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
                 return;
             }
+            if (handleNow || !sslDelegate.resumeReader()) {
+                processPendingSubscriber();
+            }
+        }
 
+        // Can be called outside of the flow if an error has already been
+        // raise. Otherwise, must be called within the SSLFlowDelegate
+        // downstream reader flow.
+        // If there is a subscription, and if there is a pending delegate,
+        // calls dropSubscription() on the previous delegate (if any),
+        // then subscribe the pending delegate.
+        void processPendingSubscriber() {
+            Flow.Subscription subscription;
+            DelegateWrapper delegateWrapper, previous;
+            synchronized (this) {
+                delegateWrapper = pendingDelegate.get();
+                if (delegateWrapper == null) return;
+                subscription = readSubscription;
+                previous = subscribed;
+            }
+            if (subscription == null) {
+                debug.log(Level.DEBUG,
+                         "SSLSubscriberWrapper (reader) %s",
+                         "processPendingSubscriber: no subscription yet");
+                return;
+            }
+            delegateWrapper = pendingDelegate.getAndSet(null);
+            if (delegateWrapper == null) return;
+            if (previous != null) {
+                previous.dropSubscription();
+            }
             onNewSubscription(delegateWrapper,
                               delegateWrapper::onSubscribe,
                               subscription);
@@ -212,7 +293,7 @@
 
         @Override
         public void dropSubscription() {
-            DelegateWrapper subscriberImpl = delegate;
+            DelegateWrapper subscriberImpl = subscribed;
             if (subscriberImpl != null) {
                 subscriberImpl.dropSubscription();
             }
@@ -223,20 +304,7 @@
             debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onConnection(%s)",
                       subscription);
-            assert subscription != null;
-            DelegateWrapper subscriberImpl;
-            synchronized (this) {
-                subscriberImpl = delegate;
-                readSubscription = subscription;
-            }
-            if (subscriberImpl == null) {
-                debug.log(Level.DEBUG,
-                      "SSLSubscriberWrapper (reader) onConnection: no delegate yet");
-                return;
-            }
-            onNewSubscription(subscriberImpl,
-                              subscriberImpl::onConnection,
-                              subscription);
+            onSubscribeImpl(subscription);
         }
 
         @Override
@@ -244,21 +312,46 @@
             debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
                       subscription);
-            readSubscription = subscription;
+            onSubscribeImpl(subscription);
+        }
+
+        // called in the reader flow, from either onSubscribe or onConnection.
+        private void onSubscribeImpl(Flow.Subscription subscription) {
             assert subscription != null;
-            DelegateWrapper subscriberImpl;
+            DelegateWrapper subscriberImpl, pending;
             synchronized (this) {
-                subscriberImpl = delegate;
                 readSubscription = subscription;
+                subscriberImpl = subscribed;
+                pending = pendingDelegate.get();
             }
-            if (subscriberImpl == null) {
+
+            if (subscriberImpl == null && pending == null) {
                 debug.log(Level.DEBUG,
-                      "SSLSubscriberWrapper (reader) onSubscribe: no delegate yet");
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "no delegate yet");
                 return;
             }
-            onNewSubscription(subscriberImpl,
-                              subscriberImpl::onSubscribe,
-                              subscription);
+
+            if (pending == null) {
+                // There is no pending delegate, but we have a previously
+                // subscribed delegate. This is obviously a re-subscribe.
+                // We are in the downstream reader flow, so we should call
+                // onConnection directly.
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "resusbcribing");
+                onNewSubscription(subscriberImpl,
+                                  subscriberImpl::onConnection,
+                                  subscription);
+            } else {
+                // We have some pending subscriber: subscribe it now that we have
+                // a subscription. If we already had a previous delegate then
+                // it will get a dropSubscription().
+                debug.log(Level.DEBUG,
+                      "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+                      "subscribing pending");
+                processPendingSubscriber();
+            }
         }
 
         private void onNewSubscription(DelegateWrapper subscriberImpl,
@@ -275,11 +368,6 @@
             sslDelegate.resetReaderDemand();
             // send the subscription to the subscriber.
             method.accept(subscription);
-            // reschedule after calling onSubscribe (this should not be
-            // strictly needed as the first call to subscription.request()
-            // coming after resetting the demand should trigger it).
-            // However, it should not do any harm.
-            sslDelegate.resumeReader();
 
             // The following twisted logic is just here that we don't invoke
             // onError before onSubscribe. It also prevents race conditions
@@ -287,9 +375,7 @@
             synchronized (this) {
                 failed = this.errorRef.get();
                 completed = finished;
-                if (delegate == subscriberImpl) {
-                    subscribed = subscriberImpl;
-                }
+                subscribed = subscriberImpl;
             }
             if (failed != null) {
                 subscriberImpl.onError(failed);
@@ -300,7 +386,7 @@
 
         @Override
         public void onNext(List<ByteBuffer> item) {
-            delegate.onNext(item);
+            subscribed.onNext(item);
         }
 
         public void onErrorImpl(Throwable throwable) {
@@ -322,6 +408,10 @@
             } else {
                 debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
             }
+            // now if we have any pending subscriber, we should forward
+            // the error to them immediately as the read scheduler will
+            // already be stopped.
+            processPendingSubscriber();
         }
 
         @Override
@@ -351,6 +441,9 @@
                 finished = true;
                 subscriberImpl.onComplete();
             }
+            // now if we have any pending subscriber, we should complete
+            // them immediately as the read scheduler will already be stopped.
+            processPendingSubscriber();
         }
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java	Mon Nov 06 18:17:09 2017 +0000
@@ -343,4 +343,22 @@
     public void stop() {
         state.set(STOP);
     }
+
+    /**
+     * Returns a new {@code SequentialScheduler} that executes the provided
+     * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
+     *
+     * @apiNote
+     * This is equivalent to calling
+     * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainloop));}
+     * The main loop must not do any blocking operation.
+     *
+     * @param mainloop The main loop of the new sequential scheduler.
+     * @return a new {@code SequentialScheduler} that executes the provided
+     * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
+     */
+    public static SequentialScheduler synchronizedScheduler(Runnable mainloop) {
+        return new SequentialScheduler(new SynchronizedRestartableTask(mainloop));
+    }
+
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Mon Nov 06 18:17:09 2017 +0000
@@ -63,14 +63,16 @@
     final System.Logger logger =
             Utils.getDebugLogger(this::dbgString, DEBUG);
 
+    public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE };
+
     volatile Flow.Subscription upstreamSubscription;
     final SubscriptionBase downstreamSubscription;
     volatile boolean upstreamCompleted;
     volatile boolean downstreamCompleted;
     volatile boolean completionAcknowledged;
     private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber;
-    // Input Q and lo and hi pri output Qs.
-    private final ConcurrentLinkedQueue<List<ByteBuffer>> inputQ;
+    // processed byte to send to the downstream subscriber.
+    private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ;
     private final CompletableFuture<Void> cf;
     private final SequentialScheduler pushScheduler;
     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
@@ -88,9 +90,10 @@
      */
     public SubscriberWrapper()
     {
-        this.inputQ = new ConcurrentLinkedQueue<>();
-        this.cf = new CompletableFuture<>();
-        this.pushScheduler = new SequentialScheduler(new DownstreamPusher());
+        this.outputQ = new ConcurrentLinkedQueue<>();
+        this.cf = new MinimalFuture<>();
+        this.pushScheduler =
+                SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
         this.downstreamSubscription = new SubscriptionBase(pushScheduler,
                                                            this::downstreamCompletion);
     }
@@ -160,6 +163,23 @@
     }
 
     /**
+     * Override this if anything needs to be done before checking for error
+     * and processing the input queue.
+     * @return
+     */
+    protected SchedulingAction enterScheduling() {
+        return SchedulingAction.CONTINUE;
+    }
+
+    protected boolean signalScheduling() {
+        if (downstreamCompleted || pushScheduler.isStopped()) {
+            return false;
+        }
+        pushScheduler.runOrSchedule();
+        return true;
+    }
+
+    /**
      * Delivers buffers of data downstream. After incoming()
      * has been called complete == true signifying completion of the upstream
      * subscription, data may continue to be delivered, up to when outgoing() is
@@ -186,8 +206,8 @@
         } else {
             logger.log(Level.DEBUG, () -> "Adding "
                                    + Utils.remaining(buffers)
-                                   + " to inputQ queue");
-            inputQ.add(buffers);
+                                   + " to outputQ queue");
+            outputQ.add(buffers);
         }
         logger.log(Level.DEBUG, () -> "pushScheduler "
                    + (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
@@ -214,7 +234,7 @@
     /**
      * Invoked whenever it 'may' be possible to push buffers downstream.
      */
-    class DownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
+    class DownstreamPusher implements Runnable {
         @Override
         public void run() {
             try {
@@ -229,7 +249,15 @@
                 logger.log(Level.DEBUG, "DownstreamPusher: downstream is already completed");
                 return;
             }
-
+            switch (enterScheduling()) {
+                case CONTINUE: break;
+                case RESCHEDULE: pushScheduler.runOrSchedule(); return;
+                case RETURN: return;
+                default:
+                    errorRef.compareAndSet(null,
+                            new InternalError("unknown scheduling command"));
+                    break;
+            }
             // If there was an error, send it downstream.
             Throwable error = errorRef.get();
             if (error != null) {
@@ -240,13 +268,13 @@
                 logger.log(Level.DEBUG,
                         () -> "DownstreamPusher: forwarding error downstream: " + error);
                 pushScheduler.stop();
-                inputQ.clear();
+                outputQ.clear();
                 downstreamSubscriber.onError(error);
                 return;
             }
 
             // OK - no error, let's proceed
-            if (!inputQ.isEmpty()) {
+            if (!outputQ.isEmpty()) {
                 logger.log(Level.DEBUG,
                     "DownstreamPusher: queue not empty, downstreamSubscription: %s",
                      downstreamSubscription);
@@ -257,8 +285,8 @@
             }
 
             final boolean dbgOn = logger.isLoggable(Level.DEBUG);
-            while (!inputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
-                List<ByteBuffer> b = inputQ.poll();
+            while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
+                List<ByteBuffer> b = outputQ.poll();
                 if (dbgOn) logger.log(Level.DEBUG,
                                             "DownstreamPusher: Pushing "
                                             + Utils.remaining(b)
@@ -273,7 +301,7 @@
     AtomicLong upstreamWindow = new AtomicLong(0);
 
     void upstreamWindowUpdate() {
-        long downstreamQueueSize = inputQ.size();
+        long downstreamQueueSize = outputQ.size();
         long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
         if (n > 0)
             upstreamRequest(n);
@@ -365,7 +393,7 @@
         if (downstreamCompleted || !upstreamCompleted) {
             return;
         }
-        if (!inputQ.isEmpty()) {
+        if (!outputQ.isEmpty()) {
             return;
         }
         if (errorRef.get() != null) {
@@ -398,8 +426,8 @@
           .append(" upstreamWindow: ").append(upstreamWindow.toString())
           .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted))
           .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged))
-          .append(" inputQ size: ").append(Integer.toString(inputQ.size()))
-          //.append(" inputQ: ").append(inputQ.toString())
+          .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
+          //.append(" outputQ: ").append(outputQ.toString())
           .append(" cf: ").append(cf.toString())
           .append(" downstreamSubscription: ").append(downstreamSubscription.toString());
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Mon Nov 06 18:17:09 2017 +0000
@@ -31,7 +31,6 @@
 import java.net.Proxy;
 import java.net.ProxySelector;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLPermission;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -48,6 +47,7 @@
 import jdk.incubator.http.HttpClient;
 import jdk.incubator.http.WebSocket;
 import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Pair;
 import jdk.incubator.http.internal.common.SequentialScheduler;
 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
@@ -113,8 +113,8 @@
      */
     private final Object lock = new Object();
 
-    private final CompletableFuture<?> closeReceived = new CompletableFuture<>();
-    private final CompletableFuture<?> closeSent = new CompletableFuture<>();
+    private final CompletableFuture<?> closeReceived = new MinimalFuture<>();
+    private final CompletableFuture<?> closeSent = new MinimalFuture<>();
 
     /** Returns the security permission required for the given details. */
     static URLPermission permissionForServer(URI uri,
@@ -412,7 +412,7 @@
     }
 
     private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
-        CompletableFuture<WebSocket> cf = new CompletableFuture<>();
+        CompletableFuture<WebSocket> cf = new MinimalFuture<>();
         boolean added = queue.add(pair(m, cf));
         if (!added) {
             // The queue is supposed to be unbounded