src/java.net.http/share/classes/java/net/http/HttpResponse.java
branchhttp-client-branch
changeset 56405 3642d0ef7755
parent 56260 df3f97c19c1d
child 56410 1b37529eaf3a
--- a/src/java.net.http/share/classes/java/net/http/HttpResponse.java	Mon Apr 09 15:28:22 2018 +0100
+++ b/src/java.net.http/share/classes/java/net/http/HttpResponse.java	Tue Apr 10 10:25:34 2018 +0100
@@ -43,6 +43,8 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -663,6 +665,28 @@
         }
 
         /**
+         * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
+         * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
+         * obtained from {@link BodySubscribers#ofPublisher()
+         * BodySubscribers.ofPublisher()}.
+         *
+         * <p> When the {@code HttpResponse} object is returned, the response
+         * headers will have been completely read, but the body may not have
+         * been fully received yet. The {@link #body()} method returns a
+         * {@link Publisher Publisher<List<ByteBuffer>>} from which the body
+         * response bytes can be obtained as they are received. The publisher
+         * can and must be subscribed to only once.
+         *
+         * @apiNote See {@link BodySubscribers#ofPublisher()} for more
+         * information.
+         *
+         * @return a response body handler
+         */
+        public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
+            return (status, headers) -> BodySubscribers.ofPublisher();
+        }
+
+        /**
          * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain
          * BodySubscribers#buffering(BodySubscriber,int) buffering BodySubscriber}
          * that buffers data before delivering it to the downstream subscriber.
@@ -1138,6 +1162,47 @@
         }
 
         /**
+         * Returns a response subscriber which publishes the response body
+         * through a {@link Publisher Publisher<List<ByteBuffer>>}.
+         *
+         * <p> The {@link HttpResponse} using this subscriber is available
+         * immediately after the response headers have been read, without
+         * requiring to wait for the entire body to be processed. The response
+         * body bytes can then be obtained by subscribing to the publisher
+         * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
+         * method.
+         *
+         * <p>The publisher returned by the {@link HttpResponse#body() body}
+         * method can be subscribed to only once. The first subscriber will
+         * receive the body response bytes if successfully subscribed, or will
+         * cause the subscription to be cancelled otherwise.
+         * If more subscriptions are attempted, the subsequent subscribers will
+         * be immediately subscribed with an empty subscription and their
+         * {@link Subscriber#onError(Throwable) onError} method
+         * will be invoked with an {@code IllegalStateException}.
+         *
+         * @apiNote To ensure that all resources associated with the
+         * corresponding exchange are properly released the caller must
+         * ensure that the provided publisher is subscribed once, and either
+         * {@linkplain Subscription#request(long) requests} all bytes
+         * until {@link Subscriber#onComplete() onComplete} or
+         * {@link Subscriber#onError(Throwable) onError} are invoked, or
+         * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
+         * subscription} if it is unable or unwilling to do so.
+         * Note that depending on the actual HTTP protocol {@linkplain
+         * HttpClient.Version version} used for the exchange, cancelling the
+         * subscription instead of exhausting the flow may cause the underlying
+         * HTTP connection to be closed and prevent it from being reused for
+         * subsequent operations.
+         *
+         * @return A {@code BodySubscriber} which publishes the response body
+         *         through a {@code Publisher<List<ByteBuffer>>}.
+         */
+        public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
+            return ResponseSubscribers.createPublisher();
+        }
+
+        /**
          * Returns a response subscriber which discards the response body. The
          * supplied value is the value that will be returned from
          * {@link HttpResponse#body()}.