src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
branchhttp-client-branch
changeset 56234 fc391230cf7b
parent 56225 348ae78efb60
child 56315 ec8799d412fb
equal deleted inserted replaced
56233:1753108d07b9 56234:fc391230cf7b
   202 
   202 
   203     /**
   203     /**
   204      * Sometime it might be necessary to complete the downstream subscriber
   204      * Sometime it might be necessary to complete the downstream subscriber
   205      * before the upstream completes. For instance, when an SSL server
   205      * before the upstream completes. For instance, when an SSL server
   206      * sends a notify_close. In that case we should let the outgoing
   206      * sends a notify_close. In that case we should let the outgoing
   207      * complete before upstream us completed.
   207      * complete before upstream is completed.
   208      * @return true, may be overridden by subclasses.
   208      * @return true, may be overridden by subclasses.
   209      */
   209      */
   210     public boolean closing() {
   210     public boolean closing() {
   211         return false;
   211         return false;
   212     }
   212     }
   215         Objects.requireNonNull(buffers);
   215         Objects.requireNonNull(buffers);
   216         if (complete) {
   216         if (complete) {
   217             assert Utils.remaining(buffers) == 0;
   217             assert Utils.remaining(buffers) == 0;
   218             boolean closing = closing();
   218             boolean closing = closing();
   219             logger.log(Level.DEBUG,
   219             logger.log(Level.DEBUG,
   220                     "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
   220                        "completionAcknowledged upstreamCompleted:%s,"
   221                     upstreamCompleted, downstreamCompleted, closing);
   221                        + " downstreamCompleted:%s, closing:%s",
   222             if (!upstreamCompleted && !closing)
   222                        upstreamCompleted, downstreamCompleted, closing);
       
   223             if (!upstreamCompleted && !closing) {
   223                 throw new IllegalStateException("upstream not completed");
   224                 throw new IllegalStateException("upstream not completed");
       
   225             }
   224             completionAcknowledged = true;
   226             completionAcknowledged = true;
   225         } else {
   227         } else {
   226             logger.log(Level.DEBUG, () -> "Adding "
   228             logger.log(Level.DEBUG, () -> "Adding "
   227                                    + Utils.remaining(buffers)
   229                     + Utils.remaining(buffers) + " to outputQ queue");
   228                                    + " to outputQ queue");
       
   229             outputQ.add(buffers);
   230             outputQ.add(buffers);
   230         }
   231         }
   231         logger.log(Level.DEBUG, () -> "pushScheduler "
   232         logger.log(Level.DEBUG, () -> "pushScheduler"
   232                    + (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
   233                    + (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
   233         pushScheduler.runOrSchedule();
   234         pushScheduler.runOrSchedule();
   234     }
   235     }
   235 
   236 
   236     /**
   237     /**
   279             }
   280             }
   280             // If there was an error, send it downstream.
   281             // If there was an error, send it downstream.
   281             Throwable error = errorRef.get();
   282             Throwable error = errorRef.get();
   282             if (error != null) {
   283             if (error != null) {
   283                 synchronized(this) {
   284                 synchronized(this) {
   284                     if (downstreamCompleted) return;
   285                     if (downstreamCompleted)
       
   286                         return;
   285                     downstreamCompleted = true;
   287                     downstreamCompleted = true;
   286                 }
   288                 }
   287                 logger.log(Level.DEBUG,
   289                 logger.log(Level.DEBUG,
   288                         () -> "DownstreamPusher: forwarding error downstream: " + error);
   290                         () -> "DownstreamPusher: forwarding error downstream: " + error);
   289                 pushScheduler.stop();
   291                 pushScheduler.stop();
   317         }
   319         }
   318     }
   320     }
   319 
   321 
   320     void upstreamWindowUpdate() {
   322     void upstreamWindowUpdate() {
   321         long downstreamQueueSize = outputQ.size();
   323         long downstreamQueueSize = outputQ.size();
   322         long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
   324         long upstreamWindowSize = upstreamWindow.get();
   323         logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d",
   325         long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
   324                 downstreamQueueSize, upstreamWindow.get());
   326         logger.log(Level.DEBUG, "upstreamWindowUpdate, "
       
   327                         + "downstreamQueueSize:%d, upstreamWindow:%d",
       
   328                         downstreamQueueSize, upstreamWindowSize);
   325         if (n > 0)
   329         if (n > 0)
   326             upstreamRequest(n);
   330             upstreamRequest(n);
   327     }
   331     }
   328 
   332 
   329     @Override
   333     @Override