equal
deleted
inserted
replaced
768 if (t != null) { |
768 if (t != null) { |
769 sendScheduler.stop(); |
769 sendScheduler.stop(); |
770 if (requestBodyCF.isDone()) return; |
770 if (requestBodyCF.isDone()) return; |
771 subscription.cancel(); |
771 subscription.cancel(); |
772 requestBodyCF.completeExceptionally(t); |
772 requestBodyCF.completeExceptionally(t); |
|
773 cancelImpl(t); |
773 return; |
774 return; |
774 } |
775 } |
775 |
776 |
776 do { |
777 do { |
777 // handle COMPLETED; |
778 // handle COMPLETED; |
823 } catch (Throwable ex) { |
824 } catch (Throwable ex) { |
824 debug.log(Level.DEBUG, "trySend: ", ex); |
825 debug.log(Level.DEBUG, "trySend: ", ex); |
825 sendScheduler.stop(); |
826 sendScheduler.stop(); |
826 subscription.cancel(); |
827 subscription.cancel(); |
827 requestBodyCF.completeExceptionally(ex); |
828 requestBodyCF.completeExceptionally(ex); |
|
829 // need to cancel the stream to 1. tell the server |
|
830 // we don't want to receive any more data and |
|
831 // 2. ensure that the operation ref count will be |
|
832 // decremented on the HttpClient. |
|
833 cancelImpl(ex); |
828 } |
834 } |
829 } |
835 } |
830 |
836 |
831 private void complete() throws IOException { |
837 private void complete() throws IOException { |
832 long remaining = remainingContentLength; |
838 long remaining = remainingContentLength; |
981 } |
987 } |
982 } |
988 } |
983 |
989 |
984 CompletableFuture<Void> sendBodyImpl() { |
990 CompletableFuture<Void> sendBodyImpl() { |
985 requestBodyCF.whenComplete((v, t) -> requestSent()); |
991 requestBodyCF.whenComplete((v, t) -> requestSent()); |
986 if (requestPublisher != null) { |
992 try { |
987 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); |
993 if (requestPublisher != null) { |
988 requestPublisher.subscribe(requestSubscriber = subscriber); |
994 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); |
989 } else { |
995 requestPublisher.subscribe(requestSubscriber = subscriber); |
990 // there is no request body, therefore the request is complete, |
996 } else { |
991 // END_STREAM has already sent with outgoing headers |
997 // there is no request body, therefore the request is complete, |
992 requestBodyCF.complete(null); |
998 // END_STREAM has already sent with outgoing headers |
|
999 requestBodyCF.complete(null); |
|
1000 } |
|
1001 } catch (Throwable t) { |
|
1002 cancelImpl(t); |
|
1003 requestBodyCF.completeExceptionally(t); |
993 } |
1004 } |
994 return requestBodyCF; |
1005 return requestBodyCF; |
995 } |
1006 } |
996 |
1007 |
997 @Override |
1008 @Override |