src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
changeset 58649 6b6bf0de534b
parent 53350 a47b8125b7cc
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Wed Oct 16 12:36:44 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Wed Oct 16 14:50:53 2019 +0100
@@ -26,9 +26,7 @@
 package jdk.internal.net.http.common;
 
 import java.io.Closeable;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -318,11 +316,33 @@
                 downstreamSubscriber.onNext(b);
                 datasent = true;
             }
-            if (datasent) upstreamWindowUpdate();
+
+            // If we have sent some decrypted data downstream,
+            // or if:
+            //    - there's nothing more available to send downstream
+            //    - and we still have some demand from downstream
+            //    - and upstream is not completed yet
+            //    - and our demand from upstream has reached 0,
+            // then check whether we should request more data from
+            // upstream
+            if (datasent || outputQ.isEmpty()
+                    && !downstreamSubscription.demand.isFulfilled()
+                    && !upstreamCompleted
+                    && upstreamWindow.get() == 0) {
+                upstreamWindowUpdate();
+            }
             checkCompletion();
         }
     }
 
+    final int outputQueueSize() {
+        return outputQ.size();
+    }
+
+    final boolean hasNoOutputData() {
+        return outputQ.isEmpty();
+    }
+
     void upstreamWindowUpdate() {
         long downstreamQueueSize = outputQ.size();
         long upstreamWindowSize = upstreamWindow.get();
@@ -341,7 +361,7 @@
             throw new IllegalStateException("Single shot publisher");
         }
         this.upstreamSubscription = subscription;
-        upstreamRequest(upstreamWindowUpdate(0, 0));
+        upstreamRequest(initialUpstreamDemand());
         if (debug.on())
             debug.log("calling downstreamSubscriber::onSubscribe on %s",
                       downstreamSubscriber);
@@ -356,7 +376,6 @@
         if (prev <= 0)
             throw new IllegalStateException("invalid onNext call");
         incomingCaller(item, false);
-        upstreamWindowUpdate();
     }
 
     private void upstreamRequest(long n) {
@@ -365,6 +384,16 @@
         upstreamSubscription.request(n);
     }
 
+    /**
+     * Initial demand that should be requested
+     * from upstream when we get the upstream subscription
+     * from {@link #onSubscribe(Flow.Subscription)}.
+     * @return The initial demand to request from upstream.
+     */
+    protected long initialUpstreamDemand() {
+        return 1;
+    }
+
     protected void requestMore() {
         if (upstreamWindow.get() == 0) {
             upstreamRequest(1);