--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Sat Nov 25 16:24:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Sat Nov 25 21:36:35 2017 +0000
@@ -30,7 +30,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -41,12 +40,10 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import jdk.incubator.http.HttpResponse.BodySubscriber;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
-import static java.util.stream.Collectors.toList;
/**
* Http/2 Stream handling.
@@ -224,8 +221,8 @@
Executor executor)
{
Log.logTrace("Reading body on stream {0}", streamid);
- responseSubscriber = handler.apply(responseCode, responseHeaders);
- CompletableFuture<T> cf = receiveData();
+ BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+ CompletableFuture<T> cf = receiveData(bodySubscriber);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
@@ -248,9 +245,7 @@
sched.runOrSchedule();
}
- /**
- * RESET always handled inline in queue
- */
+ /** Handles a RESET frame. RESET is always handled inline in the queue. */
private void receiveResetFrame(ResetFrame frame) {
inputQ.add(frame);
sched.runOrSchedule();
@@ -258,17 +253,19 @@
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
- CompletableFuture<T> receiveData() {
- responseBodyCF = MinimalFuture.of(responseSubscriber.getBody());
+ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
+ responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
if (isCanceled()) {
Throwable t = getCancelCause();
responseBodyCF.completeExceptionally(t);
- sched.runOrSchedule();
} else {
- responseSubscriber.onSubscribe(userSubscription);
- sched.runOrSchedule(); // in case data waiting already to be processed
+ bodySubscriber.onSubscribe(userSubscription);
}
+ // Set the responseSubscriber field now that onSubscribe has been called.
+ // This effectively allows the scheduler to start invoking the callbacks.
+ responseSubscriber = bodySubscriber;
+ sched.runOrSchedule(); // in case data waiting already to be processed
return responseBodyCF;
}