src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java
equal
deleted
inserted
replaced
229 boolean returnConnectionToPool, |
229 boolean returnConnectionToPool, |
230 Executor executor) |
230 Executor executor) |
231 { |
231 { |
232 Log.logTrace("Reading body on stream {0}", streamid); |
232 Log.logTrace("Reading body on stream {0}", streamid); |
233 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); |
233 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); |
234 CompletableFuture<T> cf = receiveData(bodySubscriber); |
234 CompletableFuture<T> cf = receiveData(bodySubscriber, executor); |
235 |
235 |
236 PushGroup<?> pg = exchange.getPushGroup(); |
236 PushGroup<?> pg = exchange.getPushGroup(); |
237 if (pg != null) { |
237 if (pg != null) { |
238 // if an error occurs make sure it is recorded in the PushGroup |
238 // if an error occurs make sure it is recorded in the PushGroup |
239 cf = cf.whenComplete((t,e) -> pg.pushError(e)); |
239 cf = cf.whenComplete((t,e) -> pg.pushError(e)); |
260 sched.runOrSchedule(); |
260 sched.runOrSchedule(); |
261 } |
261 } |
262 |
262 |
263 // pushes entire response body into response subscriber |
263 // pushes entire response body into response subscriber |
264 // blocking when required by local or remote flow control |
264 // blocking when required by local or remote flow control |
265 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) { |
265 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) { |
266 responseBodyCF = MinimalFuture.of(bodySubscriber.getBody()); |
266 responseBodyCF = new MinimalFuture<>(); |
|
267 // We want to allow the subscriber's getBody() method to block so it |
|
268 // can work with InputStreams. So, we offload execution. |
|
269 executor.execute(() -> { |
|
270 bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { |
|
271 if (t == null) |
|
272 responseBodyCF.complete(body); |
|
273 else |
|
274 responseBodyCF.completeExceptionally(t); |
|
275 }); |
|
276 }); |
267 |
277 |
268 if (isCanceled()) { |
278 if (isCanceled()) { |
269 Throwable t = getCancelCause(); |
279 Throwable t = getCancelCause(); |
270 responseBodyCF.completeExceptionally(t); |
280 responseBodyCF.completeExceptionally(t); |
271 } else { |
281 } else { |