src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
branchhttp-client-branch
changeset 56071 3353cb42b1b4
parent 56054 352e845ae744
equal deleted inserted replaced
56070:66a9c3185028 56071:3353cb42b1b4
   148     private void schedule() {
   148     private void schedule() {
   149         if (responseSubscriber == null)
   149         if (responseSubscriber == null)
   150             // can't process anything yet
   150             // can't process anything yet
   151             return;
   151             return;
   152 
   152 
   153         while (!inputQ.isEmpty()) {
   153         try {
   154             Http2Frame frame  = inputQ.peek();
   154             while (!inputQ.isEmpty()) {
   155             if (frame instanceof ResetFrame) {
   155                 Http2Frame frame = inputQ.peek();
   156                 inputQ.remove();
   156                 if (frame instanceof ResetFrame) {
   157                 handleReset((ResetFrame)frame);
   157                     inputQ.remove();
   158                 return;
   158                     handleReset((ResetFrame)frame);
   159             }
   159                     return;
   160             DataFrame df = (DataFrame)frame;
   160                 }
   161             boolean finished = df.getFlag(DataFrame.END_STREAM);
   161                 DataFrame df = (DataFrame)frame;
   162 
   162                 boolean finished = df.getFlag(DataFrame.END_STREAM);
   163             List<ByteBuffer> buffers = df.getData();
   163 
   164             List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
   164                 List<ByteBuffer> buffers = df.getData();
   165             int size = Utils.remaining(dsts, Integer.MAX_VALUE);
   165                 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
   166             if (size == 0 && finished) {
   166                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
   167                 inputQ.remove();
   167                 if (size == 0 && finished) {
   168                 Log.logTrace("responseSubscriber.onComplete");
   168                     inputQ.remove();
   169                 debug.log(Level.DEBUG, "incoming: onComplete");
       
   170                 sched.stop();
       
   171                 responseSubscriber.onComplete();
       
   172                 setEndStreamReceived();
       
   173                 return;
       
   174             } else if (userSubscription.tryDecrement()) {
       
   175                 inputQ.remove();
       
   176                 Log.logTrace("responseSubscriber.onNext {0}", size);
       
   177                 debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
       
   178                 responseSubscriber.onNext(dsts);
       
   179                 if (consumed(df)) {
       
   180                     Log.logTrace("responseSubscriber.onComplete");
   169                     Log.logTrace("responseSubscriber.onComplete");
   181                     debug.log(Level.DEBUG, "incoming: onComplete");
   170                     debug.log(Level.DEBUG, "incoming: onComplete");
   182                     sched.stop();
   171                     sched.stop();
   183                     responseSubscriber.onComplete();
   172                     responseSubscriber.onComplete();
   184                     setEndStreamReceived();
   173                     setEndStreamReceived();
   185                     return;
   174                     return;
       
   175                 } else if (userSubscription.tryDecrement()) {
       
   176                     inputQ.remove();
       
   177                     Log.logTrace("responseSubscriber.onNext {0}", size);
       
   178                     debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
       
   179                     responseSubscriber.onNext(dsts);
       
   180                     if (consumed(df)) {
       
   181                         Log.logTrace("responseSubscriber.onComplete");
       
   182                         debug.log(Level.DEBUG, "incoming: onComplete");
       
   183                         sched.stop();
       
   184                         responseSubscriber.onComplete();
       
   185                         setEndStreamReceived();
       
   186                         return;
       
   187                     }
       
   188                 } else {
       
   189                     return;
   186                 }
   190                 }
   187             } else {
   191             }
   188                 return;
   192         } catch (Throwable throwable) {
   189             }
   193             failed = throwable;
   190         }
   194         }
       
   195 
   191         Throwable t = failed;
   196         Throwable t = failed;
   192         if (t != null) {
   197         if (t != null) {
   193             sched.stop();
   198             sched.stop();
   194             responseSubscriber.onError(t);
   199             responseSubscriber.onError(t);
   195             close();
   200             close();