diff -r 818a23db260c -r 4254bed3c09d src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 20 17:15:16 2018 +0200 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 20 09:05:57 2018 -0700 @@ -309,7 +309,7 @@ synchronized (this) { previous = pendingDelegate.getAndSet(delegateWrapper); subscription = readSubscription; - handleNow = this.errorRef.get() != null || finished; + handleNow = this.errorRef.get() != null || onCompleteReceived; } if (previous != null) { previous.dropSubscription(); @@ -424,12 +424,20 @@ // if onError is invoked concurrently with setDelegate. synchronized (this) { failed = this.errorRef.get(); - completed = finished; + completed = onCompleteReceived; subscribed = subscriberImpl; } + if (failed != null) { + if (debug.on()) + debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s", + subscriberImpl, failed); subscriberImpl.onError(failed); } else if (completed) { + if (debug.on()) + debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted", + subscriberImpl); + finished = true; subscriberImpl.onComplete(); } } @@ -490,7 +498,6 @@ @Override public void onComplete() { assert !finished && !onCompleteReceived; - onCompleteReceived = true; DelegateWrapper subscriberImpl; synchronized(this) { subscriberImpl = subscribed; @@ -505,8 +512,10 @@ onErrorImpl(new SSLHandshakeException( "Remote host terminated the handshake")); } else if (subscriberImpl != null) { - finished = true; + onCompleteReceived = finished = true; subscriberImpl.onComplete(); + } else { + onCompleteReceived = true; } // now if we have any pending subscriber, we should complete // them immediately as the read scheduler will already be stopped. @@ -528,12 +537,17 @@ final class SSLSubscriptionWrapper implements Flow.Subscription { volatile Flow.Subscription delegate; + private volatile boolean cancelled; void setSubscription(Flow.Subscription sub) { long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand? delegate = sub; - if (debug.on()) debug.log("setSubscription: demand=%d", demand); - if (demand > 0) + if (debug.on()) + debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled); + + if (cancelled) + delegate.cancel(); + else if (demand > 0) sub.request(demand); } @@ -549,7 +563,9 @@ @Override public void cancel() { - // TODO: no-op or error? + cancelled = true; + if (delegate != null) + delegate.cancel(); } }