src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
branchhttp-client-branch
changeset 56165 8a6065d830b9
parent 56126 86e628130926
child 56166 56c52d6417d1
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu Feb 22 17:33:21 2018 +0000
@@ -230,16 +230,21 @@
                                        boolean returnConnectionToPool,
                                        Executor executor)
     {
-        Log.logTrace("Reading body on stream {0}", streamid);
-        BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
+        try {
+            Log.logTrace("Reading body on stream {0}", streamid);
+            BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+            CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 
-        PushGroup<?> pg = exchange.getPushGroup();
-        if (pg != null) {
-            // if an error occurs make sure it is recorded in the PushGroup
-            cf = cf.whenComplete((t,e) -> pg.pushError(e));
+            PushGroup<?> pg = exchange.getPushGroup();
+            if (pg != null) {
+                // if an error occurs make sure it is recorded in the PushGroup
+                cf = cf.whenComplete((t, e) -> pg.pushError(e));
+            }
+            return cf;
+        } catch (Throwable t) {
+            // may be thrown by handler.apply
+            return MinimalFuture.failedFuture(t);
         }
-        return cf;
     }
 
     @Override
@@ -268,12 +273,16 @@
         // We want to allow the subscriber's getBody() method to block so it
         // can work with InputStreams. So, we offload execution.
         executor.execute(() -> {
-            bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
-                if (t == null)
-                    responseBodyCF.complete(body);
-                else
-                    responseBodyCF.completeExceptionally(t);
-            });
+            try {
+                bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+                    if (t == null)
+                        responseBodyCF.complete(body);
+                    else
+                        responseBodyCF.completeExceptionally(t);
+                });
+            } catch(Throwable t) {
+                cancelImpl(t);
+            }
         });
 
         if (isCanceled()) {
@@ -281,11 +290,11 @@
             responseBodyCF.completeExceptionally(t);
         } else {
             bodySubscriber.onSubscribe(userSubscription);
+            // Set the responseSubscriber field now that onSubscribe has been called.
+            // This effectively allows the scheduler to start invoking the callbacks.
+            responseSubscriber = bodySubscriber;
+            sched.runOrSchedule(); // in case data waiting already to be processed
         }
-        // Set the responseSubscriber field now that onSubscribe has been called.
-        // This effectively allows the scheduler to start invoking the callbacks.
-        responseSubscriber = bodySubscriber;
-        sched.runOrSchedule(); // in case data waiting already to be processed
         return responseBodyCF;
     }