src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
branchhttp-client-branch
changeset 56008 bbd688c6fbbb
parent 55973 4d9b002587db
parent 48408 4f830b447edf
child 56009 cf8792f51dee
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Dec 15 14:26:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Jan 05 14:11:48 2018 +0000
@@ -46,6 +46,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.net.ssl.SSLParameters;
@@ -329,6 +330,75 @@
         public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);
 
         /**
+         * Returns a response body handler that returns a {@link BodySubscriber
+         * BodySubscriber}{@code <Void>} obtained from {@linkplain
+         * BodySubscriber#fromSubscriber(Subscriber)}, with the given
+         * {@code subscriber}.
+         *
+         * <p> The response body is not available through this, or the {@code
+         * HttpResponse} API, but instead all response body is forwarded to the
+         * given {@code subscriber}, which should make it available, if
+         * appropriate, through some other mechanism, e.g. an entry in a
+         * database, etc.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * <p> For example:
+         * <pre> {@code
+         *  TextSubscriber subscriber = new TextSubscriber();
+         *  HttpResponse<Void> response = client.sendAsync(request,
+         *      BodyHandler.fromSubscriber(subscriber)).join();
+         *  System.out.println(response.statusCode());
+         * }</pre>
+         *
+         * @param subscriber the subscriber
+         * @return a response body handler
+         */
+        public static BodyHandler<Void>
+        fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
+            Objects.requireNonNull(subscriber);
+            return (status, headers) -> BodySubscriber.fromSubscriber(subscriber,
+                                                                      s -> null);
+        }
+
+        /**
+         * Returns a response body handler that returns a {@link BodySubscriber
+         * BodySubscriber}{@code <T>} obtained from {@link
+         * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
+         * given {@code subscriber} and {@code finisher} function.
+         *
+         * <p> The given {@code finisher} function is applied after the given
+         * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+         * function is invoked with the given subscriber, and returns a value
+         * that is set as the response's body.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * <p> For example:
+         * <pre> {@code
+         * TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
+         * HttpResponse<String> response = client.sendAsync(request,
+         *     BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
+         * String text = response.body();
+         * }</pre>
+         *
+         * @param <S> the type of the Subscriber
+         * @param <T> the type of the response body
+         * @param subscriber the subscriber
+         * @param finisher a function to be applied after the subscriber has completed
+         * @return a response body handler
+         */
+        public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
+        fromSubscriber(S subscriber, Function<S,T> finisher) {
+            Objects.requireNonNull(subscriber);
+            Objects.requireNonNull(finisher);
+            return (status, headers) -> BodySubscriber.fromSubscriber(subscriber,
+                                                                      finisher);
+        }
+
+        /**
          * Returns a response body handler which discards the response body and
          * uses the given value as a replacement for it.
          *
@@ -344,24 +414,14 @@
          * Returns a {@code BodyHandler<String>} that returns a
          * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from
          * {@link BodySubscriber#asString(Charset) BodySubscriber.asString(Charset)}.
-         * If a charset is provided, the body is decoded using it. If charset is
-         * {@code null} then the handler tries to determine the character set
-         * from the {@code Content-encoding} header. If that charset is not
-         * supported then {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8}
-         * is used.
+         * The body is decoded using the given character set.
          *
-         * @param charset The name of the charset to interpret the body as. If
-         *                {@code null} then the charset is determined from the
-         *                <i>Content-encoding</i> header.
+         * @param charset the character set to convert the body with
          * @return a response body handler
          */
         public static BodyHandler<String> asString(Charset charset) {
-            return (status, headers) -> {
-                if (charset != null) {
-                    return BodySubscriber.asString(charset);
-                }
-                return BodySubscriber.asString(charsetFrom(headers));
-            };
+            Objects.requireNonNull(charset);
+            return (status, headers) -> BodySubscriber.asString(charset);
         }
 
         /**
@@ -386,11 +446,11 @@
          */
         public static BodyHandler<Path> asFile(Path file, OpenOption... openOptions) {
             Objects.requireNonNull(file);
+            List<OpenOption> opts = List.of(openOptions);
             SecurityManager sm = System.getSecurityManager();
             if (sm != null) {
                 String fn = pathForSecurityCheck(file);
                 sm.checkWrite(fn);
-                List<OpenOption> opts = Arrays.asList(openOptions);
                 if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE))
                     sm.checkDelete(fn);
                 if (opts.contains(StandardOpenOption.READ))
@@ -450,11 +510,11 @@
         public static BodyHandler<Path> asFileDownload(Path directory,
                                                        OpenOption... openOptions) {
             Objects.requireNonNull(directory);
+            List<OpenOption> opts = List.of(openOptions);
             SecurityManager sm = System.getSecurityManager();
             if (sm != null) {
                 String fn = pathForSecurityCheck(directory);
                 sm.checkWrite(fn);
-                List<OpenOption> opts = Arrays.asList(openOptions);
                 if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE))
                     sm.checkDelete(fn);
                 if (opts.contains(StandardOpenOption.READ))
@@ -494,6 +554,7 @@
          * @return a response body handler
          */
         public static BodyHandler<Void> asByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
+            Objects.requireNonNull(consumer);
             return (status, headers) -> BodySubscriber.asByteArrayConsumer(consumer);
         }
 
@@ -547,6 +608,7 @@
          */
          public static <T> BodyHandler<T> buffering(BodyHandler<T> downstreamHandler,
                                                     int bufferSize) {
+             Objects.requireNonNull(downstreamHandler);
              if (bufferSize <= 0)
                  throw new IllegalArgumentException("must be greater than 0");
              return (status, headers) -> BodySubscriber
@@ -603,6 +665,53 @@
         public CompletionStage<T> getBody();
 
         /**
+         * Returns a body subscriber that forwards all response body to the
+         * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+         * stage} of the returned body subscriber completes after one of the
+         * given subscribers {@code onComplete} or {@code onError} has been
+         * invoked.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * @param <S> the type of the Subscriber
+         * @param subscriber the subscriber
+         * @return a body subscriber
+         */
+        public static <S extends Subscriber<? super List<ByteBuffer>>> BodySubscriber<Void>
+        fromSubscriber(S subscriber) {
+            return new ResponseSubscribers.SubscriberAdapter<S,Void>(subscriber, s -> null);
+        }
+
+        /**
+         * Returns a body subscriber that forwards all response body to the
+         * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+         * stage} of the returned body subscriber completes after one of the
+         * given subscribers {@code onComplete} or {@code onError} has been
+         * invoked.
+         *
+         * <p> The given {@code finisher} function is applied after the given
+         * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+         * function is invoked with the given subscriber, and returns a value
+         * that is set as the response's body.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * @param <S> the type of the Subscriber
+         * @param <T> the type of the response body
+         * @param subscriber the subscriber
+         * @param finisher a function to be applied after the subscriber has
+         *                 completed
+         * @return a body subscriber
+         */
+        public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T>
+        fromSubscriber(S subscriber,
+                       Function<S,T> finisher) {
+            return new ResponseSubscribers.SubscriberAdapter<S,T>(subscriber, finisher);
+        }
+
+        /**
          * Returns a body subscriber which stores the response body as a {@code
          * String} converted using the given {@code Charset}.
          *
@@ -613,6 +722,7 @@
          * @return a body subscriber
          */
         public static BodySubscriber<String> asString(Charset charset) {
+            Objects.requireNonNull(charset);
             return new ResponseSubscribers.ByteArraySubscriber<>(
                     bytes -> new String(bytes, charset)
             );
@@ -662,11 +772,11 @@
          */
         public static BodySubscriber<Path> asFile(Path file, OpenOption... openOptions) {
             Objects.requireNonNull(file);
+            List<OpenOption> opts = List.of(openOptions);
             SecurityManager sm = System.getSecurityManager();
             if (sm != null) {
                 String fn = pathForSecurityCheck(file);
                 sm.checkWrite(fn);
-                List<OpenOption> opts = Arrays.asList(openOptions);
                 if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE))
                     sm.checkDelete(fn);
                 if (opts.contains(StandardOpenOption.READ))