src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
branchhttp-client-branch
changeset 55950 5e1707e5a254
parent 55946 cfa4f84b7fcc
child 55973 4d9b002587db
equal deleted inserted replaced
55948:33ffdf2f703e 55950:5e1707e5a254
   209 
   209 
   210     public void outgoing(List<ByteBuffer> buffers, boolean complete) {
   210     public void outgoing(List<ByteBuffer> buffers, boolean complete) {
   211         Objects.requireNonNull(buffers);
   211         Objects.requireNonNull(buffers);
   212         if (complete) {
   212         if (complete) {
   213             assert Utils.remaining(buffers) == 0;
   213             assert Utils.remaining(buffers) == 0;
   214             logger.log(Level.DEBUG, "completionAcknowledged");
   214             boolean closing = closing();
   215             if (!upstreamCompleted && !closing())
   215             logger.log(Level.DEBUG,
       
   216                     "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
       
   217                     upstreamCompleted, downstreamCompleted, closing);
       
   218             if (!upstreamCompleted && !closing)
   216                 throw new IllegalStateException("upstream not completed");
   219                 throw new IllegalStateException("upstream not completed");
   217             completionAcknowledged = true;
   220             completionAcknowledged = true;
   218         } else {
   221         } else {
   219             logger.log(Level.DEBUG, () -> "Adding "
   222             logger.log(Level.DEBUG, () -> "Adding "
   220                                    + Utils.remaining(buffers)
   223                                    + Utils.remaining(buffers)
   415         if (errorRef.get() != null) {
   418         if (errorRef.get() != null) {
   416             pushScheduler.runOrSchedule();
   419             pushScheduler.runOrSchedule();
   417             return;
   420             return;
   418         }
   421         }
   419         if (completionAcknowledged) {
   422         if (completionAcknowledged) {
       
   423             logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()");
   420             downstreamSubscriber.onComplete();
   424             downstreamSubscriber.onComplete();
   421             // Fix me subscriber.onComplete.run();
   425             // Fix me subscriber.onComplete.run();
   422             downstreamCompleted = true;
   426             downstreamCompleted = true;
   423             cf.complete(null);
   427             cf.complete(null);
   424         }
   428         }