src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Sun Dec 03 20:56:29 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Mon Dec 04 13:00:05 2017 +0000
@@ -76,6 +76,7 @@
private final CompletableFuture<Void> cf;
private final SequentialScheduler pushScheduler;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final AtomicLong upstreamWindow = new AtomicLong(0);
/**
* Wraps the given downstream subscriber. For each call to {@link
@@ -309,8 +310,6 @@
}
}
- AtomicLong upstreamWindow = new AtomicLong(0);
-
void upstreamWindowUpdate() {
long downstreamQueueSize = outputQ.size();
long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
@@ -348,6 +347,12 @@
upstreamSubscription.request(n);
}
+ protected void requestMore() {
+ if (upstreamWindow.get() == 0) {
+ upstreamRequest(1);
+ }
+ }
+
public long upstreamWindow() {
return upstreamWindow.get();
}