src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
branchhttp-client-branch
changeset 56730 b08918259eed
parent 56507 2294c51eae30
child 56756 ba60eaef37d7
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Mon Jun 11 12:16:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Mon Jun 11 13:07:18 2018 +0100
@@ -309,7 +309,7 @@
             synchronized (this) {
                 previous = pendingDelegate.getAndSet(delegateWrapper);
                 subscription = readSubscription;
-                handleNow = this.errorRef.get() != null || finished;
+                handleNow = this.errorRef.get() != null || finished || readSubscriber.onCompleteReceived;
             }
             if (previous != null) {
                 previous.dropSubscription();
@@ -424,12 +424,19 @@
             // if onError is invoked concurrently with setDelegate.
             synchronized (this) {
                 failed = this.errorRef.get();
-                completed = finished;
+                completed = finished || onCompleteReceived;
                 subscribed = subscriberImpl;
             }
+
             if (failed != null) {
+                if (debug.on())
+                    debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s",
+                              subscriberImpl, failed);
                 subscriberImpl.onError(failed);
             } else if (completed) {
+                if (debug.on())
+                    debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
+                              subscriberImpl);
                 subscriberImpl.onComplete();
             }
         }
@@ -528,12 +535,17 @@
     final class SSLSubscriptionWrapper implements Flow.Subscription {
 
         volatile Flow.Subscription delegate;
+        private volatile boolean cancelled;
 
         void setSubscription(Flow.Subscription sub) {
             long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
             delegate = sub;
-            if (debug.on()) debug.log("setSubscription: demand=%d", demand);
-            if (demand > 0)
+            if (debug.on())
+                debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled);
+
+            if (cancelled)
+                delegate.cancel();
+            else if (demand > 0)
                 sub.request(demand);
         }
 
@@ -549,7 +561,9 @@
 
         @Override
         public void cancel() {
-            // TODO:  no-op or error?
+            cancelled = true;
+            if (delegate != null)
+                delegate.cancel();
         }
     }