--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Mon Jun 11 12:16:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Mon Jun 11 13:07:18 2018 +0100
@@ -309,7 +309,7 @@
synchronized (this) {
previous = pendingDelegate.getAndSet(delegateWrapper);
subscription = readSubscription;
- handleNow = this.errorRef.get() != null || finished;
+ handleNow = this.errorRef.get() != null || finished || readSubscriber.onCompleteReceived;
}
if (previous != null) {
previous.dropSubscription();
@@ -424,12 +424,19 @@
// if onError is invoked concurrently with setDelegate.
synchronized (this) {
failed = this.errorRef.get();
- completed = finished;
+ completed = finished || onCompleteReceived;
subscribed = subscriberImpl;
}
+
if (failed != null) {
+ if (debug.on())
+ debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s",
+ subscriberImpl, failed);
subscriberImpl.onError(failed);
} else if (completed) {
+ if (debug.on())
+ debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
+ subscriberImpl);
subscriberImpl.onComplete();
}
}
@@ -528,12 +535,17 @@
final class SSLSubscriptionWrapper implements Flow.Subscription {
volatile Flow.Subscription delegate;
+ private volatile boolean cancelled;
void setSubscription(Flow.Subscription sub) {
long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
delegate = sub;
- if (debug.on()) debug.log("setSubscription: demand=%d", demand);
- if (demand > 0)
+ if (debug.on())
+ debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled);
+
+ if (cancelled)
+ delegate.cancel();
+ else if (demand > 0)
sub.request(demand);
}
@@ -549,7 +561,9 @@
@Override
public void cancel() {
- // TODO: no-op or error?
+ cancelled = true;
+ if (delegate != null)
+ delegate.cancel();
}
}