src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
changeset 58649 6b6bf0de534b
parent 53350 a47b8125b7cc
equal deleted inserted replaced
58644:64597a6fd186 58649:6b6bf0de534b
    24  */
    24  */
    25 
    25 
    26 package jdk.internal.net.http.common;
    26 package jdk.internal.net.http.common;
    27 
    27 
    28 import java.io.Closeable;
    28 import java.io.Closeable;
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
    29 import java.nio.ByteBuffer;
    31 import java.util.ArrayList;
       
    32 import java.util.List;
    30 import java.util.List;
    33 import java.util.Objects;
    31 import java.util.Objects;
    34 import java.util.concurrent.CompletableFuture;
    32 import java.util.concurrent.CompletableFuture;
    35 import java.util.concurrent.ConcurrentLinkedQueue;
    33 import java.util.concurrent.ConcurrentLinkedQueue;
    36 import java.util.concurrent.Flow;
    34 import java.util.concurrent.Flow;
   316                     debug.log("DownstreamPusher: Pushing %d bytes downstream",
   314                     debug.log("DownstreamPusher: Pushing %d bytes downstream",
   317                               Utils.remaining(b));
   315                               Utils.remaining(b));
   318                 downstreamSubscriber.onNext(b);
   316                 downstreamSubscriber.onNext(b);
   319                 datasent = true;
   317                 datasent = true;
   320             }
   318             }
   321             if (datasent) upstreamWindowUpdate();
   319 
       
   320             // If we have sent some decrypted data downstream,
       
   321             // or if:
       
   322             //    - there's nothing more available to send downstream
       
   323             //    - and we still have some demand from downstream
       
   324             //    - and upstream is not completed yet
       
   325             //    - and our demand from upstream has reached 0,
       
   326             // then check whether we should request more data from
       
   327             // upstream
       
   328             if (datasent || outputQ.isEmpty()
       
   329                     && !downstreamSubscription.demand.isFulfilled()
       
   330                     && !upstreamCompleted
       
   331                     && upstreamWindow.get() == 0) {
       
   332                 upstreamWindowUpdate();
       
   333             }
   322             checkCompletion();
   334             checkCompletion();
   323         }
   335         }
       
   336     }
       
   337 
       
   338     final int outputQueueSize() {
       
   339         return outputQ.size();
       
   340     }
       
   341 
       
   342     final boolean hasNoOutputData() {
       
   343         return outputQ.isEmpty();
   324     }
   344     }
   325 
   345 
   326     void upstreamWindowUpdate() {
   346     void upstreamWindowUpdate() {
   327         long downstreamQueueSize = outputQ.size();
   347         long downstreamQueueSize = outputQ.size();
   328         long upstreamWindowSize = upstreamWindow.get();
   348         long upstreamWindowSize = upstreamWindow.get();
   339     public void onSubscribe(Flow.Subscription subscription) {
   359     public void onSubscribe(Flow.Subscription subscription) {
   340         if (upstreamSubscription != null) {
   360         if (upstreamSubscription != null) {
   341             throw new IllegalStateException("Single shot publisher");
   361             throw new IllegalStateException("Single shot publisher");
   342         }
   362         }
   343         this.upstreamSubscription = subscription;
   363         this.upstreamSubscription = subscription;
   344         upstreamRequest(upstreamWindowUpdate(0, 0));
   364         upstreamRequest(initialUpstreamDemand());
   345         if (debug.on())
   365         if (debug.on())
   346             debug.log("calling downstreamSubscriber::onSubscribe on %s",
   366             debug.log("calling downstreamSubscriber::onSubscribe on %s",
   347                       downstreamSubscriber);
   367                       downstreamSubscriber);
   348         downstreamSubscriber.onSubscribe(downstreamSubscription);
   368         downstreamSubscriber.onSubscribe(downstreamSubscription);
   349         onSubscribe();
   369         onSubscribe();
   354         if (debug.on()) debug.log("onNext");
   374         if (debug.on()) debug.log("onNext");
   355         long prev = upstreamWindow.getAndDecrement();
   375         long prev = upstreamWindow.getAndDecrement();
   356         if (prev <= 0)
   376         if (prev <= 0)
   357             throw new IllegalStateException("invalid onNext call");
   377             throw new IllegalStateException("invalid onNext call");
   358         incomingCaller(item, false);
   378         incomingCaller(item, false);
   359         upstreamWindowUpdate();
       
   360     }
   379     }
   361 
   380 
   362     private void upstreamRequest(long n) {
   381     private void upstreamRequest(long n) {
   363         if (debug.on()) debug.log("requesting %d", n);
   382         if (debug.on()) debug.log("requesting %d", n);
   364         upstreamWindow.getAndAdd(n);
   383         upstreamWindow.getAndAdd(n);
   365         upstreamSubscription.request(n);
   384         upstreamSubscription.request(n);
       
   385     }
       
   386 
       
   387     /**
       
   388      * Initial demand that should be requested
       
   389      * from upstream when we get the upstream subscription
       
   390      * from {@link #onSubscribe(Flow.Subscription)}.
       
   391      * @return The initial demand to request from upstream.
       
   392      */
       
   393     protected long initialUpstreamDemand() {
       
   394         return 1;
   366     }
   395     }
   367 
   396 
   368     protected void requestMore() {
   397     protected void requestMore() {
   369         if (upstreamWindow.get() == 0) {
   398         if (upstreamWindow.get() == 0) {
   370             upstreamRequest(1);
   399             upstreamRequest(1);