--- a/src/java.net.http/share/classes/java/net/http/HttpResponse.java Mon Apr 09 15:28:22 2018 +0100
+++ b/src/java.net.http/share/classes/java/net/http/HttpResponse.java Tue Apr 10 10:25:34 2018 +0100
@@ -43,6 +43,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -663,6 +665,28 @@
}
/**
+ * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
+ * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
+ * obtained from {@link BodySubscribers#ofPublisher()
+ * BodySubscribers.ofPublisher()}.
+ *
+ * <p> When the {@code HttpResponse} object is returned, the response
+ * headers will have been completely read, but the body may not have
+ * been fully received yet. The {@link #body()} method returns a
+ * {@link Publisher Publisher<List<ByteBuffer>>} from which the body
+ * response bytes can be obtained as they are received. The publisher
+ * can and must be subscribed to only once.
+ *
+ * @apiNote See {@link BodySubscribers#ofPublisher()} for more
+ * information.
+ *
+ * @return a response body handler
+ */
+ public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
+ return (status, headers) -> BodySubscribers.ofPublisher();
+ }
+
+ /**
* Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain
* BodySubscribers#buffering(BodySubscriber,int) buffering BodySubscriber}
* that buffers data before delivering it to the downstream subscriber.
@@ -1138,6 +1162,47 @@
}
/**
+ * Returns a response subscriber which publishes the response body
+ * through a {@link Publisher Publisher<List<ByteBuffer>>}.
+ *
+ * <p> The {@link HttpResponse} using this subscriber is available
+ * immediately after the response headers have been read, without
+ * requiring to wait for the entire body to be processed. The response
+ * body bytes can then be obtained by subscribing to the publisher
+ * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
+ * method.
+ *
+ * <p>The publisher returned by the {@link HttpResponse#body() body}
+ * method can be subscribed to only once. The first subscriber will
+ * receive the body response bytes if successfully subscribed, or will
+ * cause the subscription to be cancelled otherwise.
+ * If more subscriptions are attempted, the subsequent subscribers will
+ * be immediately subscribed with an empty subscription and their
+ * {@link Subscriber#onError(Throwable) onError} method
+ * will be invoked with an {@code IllegalStateException}.
+ *
+ * @apiNote To ensure that all resources associated with the
+ * corresponding exchange are properly released the caller must
+ * ensure that the provided publisher is subscribed once, and either
+ * {@linkplain Subscription#request(long) requests} all bytes
+ * until {@link Subscriber#onComplete() onComplete} or
+ * {@link Subscriber#onError(Throwable) onError} are invoked, or
+ * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
+ * subscription} if it is unable or unwilling to do so.
+ * Note that depending on the actual HTTP protocol {@linkplain
+ * HttpClient.Version version} used for the exchange, cancelling the
+ * subscription instead of exhausting the flow may cause the underlying
+ * HTTP connection to be closed and prevent it from being reused for
+ * subsequent operations.
+ *
+ * @return A {@code BodySubscriber} which publishes the response body
+ * through a {@code Publisher<List<ByteBuffer>>}.
+ */
+ public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
+ return ResponseSubscribers.createPublisher();
+ }
+
+ /**
* Returns a response subscriber which discards the response body. The
* supplied value is the value that will be returned from
* {@link HttpResponse#body()}.