src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java
branchhttp-client-branch
changeset 56082 1da51fab3032
parent 56079 d23b02f37fce
equal deleted inserted replaced
56081:20c6742e5545 56082:1da51fab3032
   229                                        boolean returnConnectionToPool,
   229                                        boolean returnConnectionToPool,
   230                                        Executor executor)
   230                                        Executor executor)
   231     {
   231     {
   232         Log.logTrace("Reading body on stream {0}", streamid);
   232         Log.logTrace("Reading body on stream {0}", streamid);
   233         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
   233         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
   234         CompletableFuture<T> cf = receiveData(bodySubscriber);
   234         CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
   235 
   235 
   236         PushGroup<?> pg = exchange.getPushGroup();
   236         PushGroup<?> pg = exchange.getPushGroup();
   237         if (pg != null) {
   237         if (pg != null) {
   238             // if an error occurs make sure it is recorded in the PushGroup
   238             // if an error occurs make sure it is recorded in the PushGroup
   239             cf = cf.whenComplete((t,e) -> pg.pushError(e));
   239             cf = cf.whenComplete((t,e) -> pg.pushError(e));
   260         sched.runOrSchedule();
   260         sched.runOrSchedule();
   261     }
   261     }
   262 
   262 
   263     // pushes entire response body into response subscriber
   263     // pushes entire response body into response subscriber
   264     // blocking when required by local or remote flow control
   264     // blocking when required by local or remote flow control
   265     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
   265     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
   266         responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
   266         responseBodyCF = new MinimalFuture<>();
       
   267         // We want to allow the subscriber's getBody() method to block so it
       
   268         // can work with InputStreams. So, we offload execution.
       
   269         executor.execute(() -> {
       
   270             bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
       
   271                 if (t == null)
       
   272                     responseBodyCF.complete(body);
       
   273                 else
       
   274                     responseBodyCF.completeExceptionally(t);
       
   275             });
       
   276         });
   267 
   277 
   268         if (isCanceled()) {
   278         if (isCanceled()) {
   269             Throwable t = getCancelCause();
   279             Throwable t = getCancelCause();
   270             responseBodyCF.completeExceptionally(t);
   280             responseBodyCF.completeExceptionally(t);
   271         } else {
   281         } else {