src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
branchhttp-client-branch
changeset 56423 ba64c30666cc
parent 56410 1b37529eaf3a
child 56437 f8b3f053cfbb
equal deleted inserted replaced
56418:56c32f8ea406 56423:ba64c30666cc
   768                 if (t != null) {
   768                 if (t != null) {
   769                     sendScheduler.stop();
   769                     sendScheduler.stop();
   770                     if (requestBodyCF.isDone()) return;
   770                     if (requestBodyCF.isDone()) return;
   771                     subscription.cancel();
   771                     subscription.cancel();
   772                     requestBodyCF.completeExceptionally(t);
   772                     requestBodyCF.completeExceptionally(t);
       
   773                     cancelImpl(t);
   773                     return;
   774                     return;
   774                 }
   775                 }
   775 
   776 
   776                 do {
   777                 do {
   777                     // handle COMPLETED;
   778                     // handle COMPLETED;
   823             } catch (Throwable ex) {
   824             } catch (Throwable ex) {
   824                 debug.log(Level.DEBUG, "trySend: ", ex);
   825                 debug.log(Level.DEBUG, "trySend: ", ex);
   825                 sendScheduler.stop();
   826                 sendScheduler.stop();
   826                 subscription.cancel();
   827                 subscription.cancel();
   827                 requestBodyCF.completeExceptionally(ex);
   828                 requestBodyCF.completeExceptionally(ex);
       
   829                 // need to cancel the stream to 1. tell the server
       
   830                 // we don't want to receive any more data and
       
   831                 // 2. ensure that the operation ref count will be
       
   832                 // decremented on the HttpClient.
       
   833                 cancelImpl(ex);
   828             }
   834             }
   829         }
   835         }
   830 
   836 
   831         private void complete() throws IOException {
   837         private void complete() throws IOException {
   832             long remaining = remainingContentLength;
   838             long remaining = remainingContentLength;
   981         }
   987         }
   982     }
   988     }
   983 
   989 
   984     CompletableFuture<Void> sendBodyImpl() {
   990     CompletableFuture<Void> sendBodyImpl() {
   985         requestBodyCF.whenComplete((v, t) -> requestSent());
   991         requestBodyCF.whenComplete((v, t) -> requestSent());
   986         if (requestPublisher != null) {
   992         try {
   987             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
   993             if (requestPublisher != null) {
   988             requestPublisher.subscribe(requestSubscriber = subscriber);
   994                 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
   989         } else {
   995                 requestPublisher.subscribe(requestSubscriber = subscriber);
   990             // there is no request body, therefore the request is complete,
   996             } else {
   991             // END_STREAM has already sent with outgoing headers
   997                 // there is no request body, therefore the request is complete,
   992             requestBodyCF.complete(null);
   998                 // END_STREAM has already sent with outgoing headers
       
   999                 requestBodyCF.complete(null);
       
  1000             }
       
  1001         } catch (Throwable t) {
       
  1002             cancelImpl(t);
       
  1003             requestBodyCF.completeExceptionally(t);
   993         }
  1004         }
   994         return requestBodyCF;
  1005         return requestBodyCF;
   995     }
  1006     }
   996 
  1007 
   997     @Override
  1008     @Override