http-client-branch: fixed race condition when resubscribing with SocketTube - part II
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 09 12:05:12 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 09 12:59:06 2018 +0100
@@ -274,13 +274,16 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
WriteSubscription previous = this.subscription;
- this.subscription = new WriteSubscription(subscription);
debug.log(Level.DEBUG, "subscribed for writing");
try {
- if (current == null) {
+ boolean needEvent = current == null;
+ if (needEvent) {
if (previous != null && previous.upstreamSubscription != subscription) {
previous.dropSubscription();
}
+ }
+ this.subscription = new WriteSubscription(subscription);
+ if (needEvent) {
debug.log(Level.DEBUG, "write: registering startSubscription event");
client.registerEvent(startSubscription);
}
@@ -446,6 +449,7 @@
@Override
public void request(long n) {
+ if (cancelled) return;
upstreamSubscription.request(n);
}
@@ -455,10 +459,12 @@
upstreamSubscription.cancel();
}
- synchronized void dropSubscription() {
- cancelled = true;
- debug.log(Level.DEBUG, "write: resetting demand to 0");
- writeDemand.reset();
+ void dropSubscription() {
+ synchronized (InternalWriteSubscriber.this) {
+ cancelled = true;
+ debug.log(Level.DEBUG, "write: resetting demand to 0");
+ writeDemand.reset();
+ }
}
void requestMore() {
@@ -468,7 +474,7 @@
long d;
// don't fiddle with demand after cancel.
// see dropSubscription.
- synchronized (this) {
+ synchronized (InternalWriteSubscriber.this) {
if (cancelled) return;
d = writeDemand.get();
requestMore = writeDemand.increaseIfFulfilled();