http-client-branch: review comment onSubscribe before onXXX http-client-branch
authorchegar
Sat, 25 Nov 2017 21:36:35 +0000
branchhttp-client-branch
changeset 55875 39e9e9883c4e
parent 55874 ee17449ff241
child 55876 42cf279e4a89
http-client-branch: review comment onSubscribe before onXXX
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Sat Nov 25 16:24:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Sat Nov 25 21:36:35 2017 +0000
@@ -30,7 +30,6 @@
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -41,12 +40,10 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import jdk.incubator.http.HttpResponse.BodySubscriber;
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
-import static java.util.stream.Collectors.toList;
 
 /**
  * Http/2 Stream handling.
@@ -224,8 +221,8 @@
                                        Executor executor)
     {
         Log.logTrace("Reading body on stream {0}", streamid);
-        responseSubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData();
+        BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+        CompletableFuture<T> cf = receiveData(bodySubscriber);
 
         PushGroup<?,?> pg = exchange.getPushGroup();
         if (pg != null) {
@@ -248,9 +245,7 @@
         sched.runOrSchedule();
     }
 
-    /**
-     * RESET always handled inline in queue
-     */
+    /** Handles a RESET frame. RESET is always handled inline in the queue. */
     private void receiveResetFrame(ResetFrame frame) {
         inputQ.add(frame);
         sched.runOrSchedule();
@@ -258,17 +253,19 @@
 
     // pushes entire response body into response subscriber
     // blocking when required by local or remote flow control
-    CompletableFuture<T> receiveData() {
-        responseBodyCF = MinimalFuture.of(responseSubscriber.getBody());
+    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
+        responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
 
         if (isCanceled()) {
             Throwable t = getCancelCause();
             responseBodyCF.completeExceptionally(t);
-            sched.runOrSchedule();
         } else {
-            responseSubscriber.onSubscribe(userSubscription);
-            sched.runOrSchedule(); // in case data waiting already to be processed
+            bodySubscriber.onSubscribe(userSubscription);
         }
+        // Set the responseSubscriber field now that onSubscribe has been called.
+        // This effectively allows the scheduler to start invoking the callbacks.
+        responseSubscriber = bodySubscriber;
+        sched.runOrSchedule(); // in case data waiting already to be processed
         return responseBodyCF;
     }