src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Wed Oct 16 12:36:44 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Wed Oct 16 14:50:53 2019 +0100
@@ -26,9 +26,7 @@
package jdk.internal.net.http.common;
import java.io.Closeable;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -318,11 +316,33 @@
downstreamSubscriber.onNext(b);
datasent = true;
}
- if (datasent) upstreamWindowUpdate();
+
+ // If we have sent some decrypted data downstream,
+ // or if:
+ // - there's nothing more available to send downstream
+ // - and we still have some demand from downstream
+ // - and upstream is not completed yet
+ // - and our demand from upstream has reached 0,
+ // then check whether we should request more data from
+ // upstream
+ if (datasent || outputQ.isEmpty()
+ && !downstreamSubscription.demand.isFulfilled()
+ && !upstreamCompleted
+ && upstreamWindow.get() == 0) {
+ upstreamWindowUpdate();
+ }
checkCompletion();
}
}
+ final int outputQueueSize() {
+ return outputQ.size();
+ }
+
+ final boolean hasNoOutputData() {
+ return outputQ.isEmpty();
+ }
+
void upstreamWindowUpdate() {
long downstreamQueueSize = outputQ.size();
long upstreamWindowSize = upstreamWindow.get();
@@ -341,7 +361,7 @@
throw new IllegalStateException("Single shot publisher");
}
this.upstreamSubscription = subscription;
- upstreamRequest(upstreamWindowUpdate(0, 0));
+ upstreamRequest(initialUpstreamDemand());
if (debug.on())
debug.log("calling downstreamSubscriber::onSubscribe on %s",
downstreamSubscriber);
@@ -356,7 +376,6 @@
if (prev <= 0)
throw new IllegalStateException("invalid onNext call");
incomingCaller(item, false);
- upstreamWindowUpdate();
}
private void upstreamRequest(long n) {
@@ -365,6 +384,16 @@
upstreamSubscription.request(n);
}
+ /**
+ * Initial demand that should be requested
+ * from upstream when we get the upstream subscription
+ * from {@link #onSubscribe(Flow.Subscription)}.
+ * @return The initial demand to request from upstream.
+ */
+ protected long initialUpstreamDemand() {
+ return 1;
+ }
+
protected void requestMore() {
if (upstreamWindow.get() == 0) {
upstreamRequest(1);