--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 17:33:21 2018 +0000
@@ -230,16 +230,21 @@
boolean returnConnectionToPool,
Executor executor)
{
- Log.logTrace("Reading body on stream {0}", streamid);
- BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
- CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
+ try {
+ Log.logTrace("Reading body on stream {0}", streamid);
+ BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+ CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
- PushGroup<?> pg = exchange.getPushGroup();
- if (pg != null) {
- // if an error occurs make sure it is recorded in the PushGroup
- cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ PushGroup<?> pg = exchange.getPushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t, e) -> pg.pushError(e));
+ }
+ return cf;
+ } catch (Throwable t) {
+ // may be thrown by handler.apply
+ return MinimalFuture.failedFuture(t);
}
- return cf;
}
@Override
@@ -268,12 +273,16 @@
// We want to allow the subscriber's getBody() method to block so it
// can work with InputStreams. So, we offload execution.
executor.execute(() -> {
- bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
- if (t == null)
- responseBodyCF.complete(body);
- else
- responseBodyCF.completeExceptionally(t);
- });
+ try {
+ bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+ if (t == null)
+ responseBodyCF.complete(body);
+ else
+ responseBodyCF.completeExceptionally(t);
+ });
+ } catch(Throwable t) {
+ cancelImpl(t);
+ }
});
if (isCanceled()) {
@@ -281,11 +290,11 @@
responseBodyCF.completeExceptionally(t);
} else {
bodySubscriber.onSubscribe(userSubscription);
+ // Set the responseSubscriber field now that onSubscribe has been called.
+ // This effectively allows the scheduler to start invoking the callbacks.
+ responseSubscriber = bodySubscriber;
+ sched.runOrSchedule(); // in case data waiting already to be processed
}
- // Set the responseSubscriber field now that onSubscribe has been called.
- // This effectively allows the scheduler to start invoking the callbacks.
- responseSubscriber = bodySubscriber;
- sched.runOrSchedule(); // in case data waiting already to be processed
return responseBodyCF;
}