http-client-branch: added mapping subscriber, miscellaneous bug fixes and change to discard()/replace() subscribers http-client-branch
authormichaelm
Tue, 06 Feb 2018 16:07:43 +0000
branchhttp-client-branch
changeset 56082 1da51fab3032
parent 56081 20c6742e5545
child 56083 32ff53628380
http-client-branch: added mapping subscriber, miscellaneous bug fixes and change to discard()/replace() subscribers
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PlainTunnelingConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java
test/jdk/java/net/httpclient/ConcurrentResponses.java
test/jdk/java/net/httpclient/HandshakeFailureTest.java
test/jdk/java/net/httpclient/ImmutableHeaders.java
test/jdk/java/net/httpclient/InterruptedBlockingSend.java
test/jdk/java/net/httpclient/InvalidSSLContextTest.java
test/jdk/java/net/httpclient/MappedResponseSubscriber.java
test/jdk/java/net/httpclient/NoBodyPartTwo.java
test/jdk/java/net/httpclient/ProxyAuthTest.java
test/jdk/java/net/httpclient/RequestBodyTest.java
test/jdk/java/net/httpclient/ShortRequestBody.java
test/jdk/java/net/httpclient/SmallTimeout.java
test/jdk/java/net/httpclient/TimeoutBasic.java
test/jdk/java/net/httpclient/TimeoutOrdering.java
test/jdk/java/net/httpclient/VersionTest.java
test/jdk/java/net/httpclient/ZeroRedirects.java
test/jdk/java/net/httpclient/http2/ErrorTest.java
test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/RawChannelTest.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/SelectorTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Tue Feb 06 16:07:43 2018 +0000
@@ -78,7 +78,7 @@
  * read (or discard) the body and convert it into some useful Java object type.
  * The handler can return one of the pre-defined subscriber types, or a custom
  * subscriber, or if the body is to be discarded it can call {@link
- * BodySubscriber#discard(Object) discard} and return a subscriber which
+ * BodySubscriber#discard() discard} and return a subscriber which
  * discards the response body. Static implementations of both handlers and
  * subscribers are provided in {@linkplain BodyHandler BodyHandler} and
  * {@linkplain BodySubscriber BodySubscriber} respectively. In all cases, the
@@ -198,7 +198,8 @@
      * <li>{@link #asFileDownload(java.nio.file.Path,OpenOption...)
      * asFileDownload(Path,OpenOption...)}</li>
      * <li>{@link #asInputStream() asInputStream()}</li>
-     * <li>{@link #discard(Object) }</li>
+     * <li>{@link #discard() }</li>
+     * <li>{@link #replace(Object) }</li>
      * <li>{@link #buffering(BodyHandler, int)
      * buffering(BodyHandler,int)}</li>
      * </ul>
@@ -234,7 +235,7 @@
      *              .response(
      *                  (status, headers) -> status == 200
      *                      ? BodySubscriber.asFile(Paths.get("/tmp/f"))
-     *                      : BodySubscriber.discard(Paths.get("/NULL")));
+     *                      : BodySubscriber.replace(Paths.get("/NULL")));
      * }
      * </pre>
      *
@@ -247,7 +248,7 @@
          * response status code and headers. This method is always called before
          * the body is read and its implementation can decide to keep the body
          * and store it somewhere, or else discard it by returning the {@code
-         * BodySubscriber} returned from {@link BodySubscriber#discard(Object)
+         * BodySubscriber} returned from {@link BodySubscriber#discard()
          * discard}.
          *
          * @param statusCode the HTTP status code received
@@ -412,6 +413,15 @@
         }
 
         /**
+         * Returns a response body handler which discards the response body.
+         *
+         * @return a response body handler
+         */
+        public static BodyHandler<Void> discard() {
+            return (status, headers) -> BodySubscriber.discard();
+        }
+
+        /**
          * Returns a response body handler which discards the response body and
          * uses the given value as a replacement for it.
          *
@@ -419,8 +429,8 @@
          * @param value the value of U to return as the body, may be {@code null}
          * @return a response body handler
          */
-        public static <U> BodyHandler<U> discard(U value) {
-            return (status, headers) -> BodySubscriber.discard(value);
+        public static <U> BodyHandler<U> replace(U value) {
+            return (status, headers) -> BodySubscriber.replace(value);
         }
 
         /**
@@ -557,7 +567,7 @@
         /**
          * Returns a {@code BodyHandler<Stream<String>>} that returns a
          * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained from
-         * {@link BodySubscriber#asLines(Charset)}
+         * {@link BodySubscriber#asLines(Charset)
          * BodySubscriber.asLines(charset)}.
          * The {@link Charset charset} used to decode the response body bytes is
          * obtained from the HTTP response headers as specified by {@link #asString()},
@@ -581,6 +591,10 @@
          *
          * <p> When the {@code HttpResponse} object is returned, the body has
          * been completely written to the consumer.
+         * @apiNote
+         * The subscriber returned by this handler is not flow controlled.
+         * Therefore, the supplied consumer must be able to process whatever
+         * amount of data is delivered in a timely fashion.
          *
          * @param consumer a Consumer to accept the response body
          * @return a response body handler
@@ -777,7 +791,9 @@
 
         /**
          * Returns a {@code CompletionStage} which when completed will return
-         * the response body object.
+         * the response body object. This method can be called at any time
+         * relative to the other {@link Flow.Subscriber} methods and is invoked
+         * using the client's {@link Executor}.
          *
          * @return a CompletionStage for the response body
          */
@@ -785,7 +801,7 @@
 
         /**
          * Returns a body subscriber that forwards all response body to the
-         * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+         * given {@code Flow.Subscriber}. The {@linkplain #getBody() completion
          * stage} of the returned body subscriber completes after one of the
          * given subscribers {@code onComplete} or {@code onError} has been
          * invoked.
@@ -804,7 +820,7 @@
 
         /**
          * Returns a body subscriber that forwards all response body to the
-         * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+         * given {@code Flow.Subscriber}. The {@linkplain #getBody() completion
          * stage} of the returned body subscriber completes after one of the
          * given subscribers {@code onComplete} or {@code onError} has been
          * invoked.
@@ -833,7 +849,7 @@
         /**
          * Returns a body subscriber that forwards all response body to the
          * given {@code Flow.Subscriber}, lines by lines.
-         * The {@linkplain #getBody()} completion
+         * The {@linkplain #getBody() completion
          * stage} of the returned body subscriber completes after one of the
          * given subscribers {@code onComplete} or {@code onError} has been
          * invoked.
@@ -861,7 +877,7 @@
         /**
          * Returns a body subscriber that forwards all response body to the
          * given {@code Flow.Subscriber}, lines by lines.
-         * The {@linkplain #getBody()} completion
+         * The {@linkplain #getBody() completion
          * stage} of the returned body subscriber completes after one of the
          * given subscribers {@code onComplete} or {@code onError} has been
          * invoked.
@@ -999,6 +1015,11 @@
          * <p> The {@link HttpResponse} using this subscriber is available after
          * the entire response has been read.
          *
+         * @apiNote
+         * This subscriber is not flow controlled.
+         * Therefore, the supplied consumer must be able to process whatever
+         * amount of data is delivered in a timely fashion.
+         *
          * @param consumer a Consumer of byte arrays
          * @return a BodySubscriber
          */
@@ -1048,13 +1069,14 @@
          * the underlying HTTP connection to be closed and prevent it
          * from being reused for subsequent operations.
          *
+         * @param charset the character set to use when converting bytes to characters
          * @return a body subscriber that streams the response body as a
          *         {@link Stream Stream<String>}.
          *
          * @see BufferedReader#lines()
          */
         public static BodySubscriber<Stream<String>> asLines(Charset charset) {
-            return ResponseSubscribers.HttpLineStream.create(charset);
+            return ResponseSubscribers.createLineStream(charset);
         }
 
         /**
@@ -1066,11 +1088,20 @@
          * @param value the value to return from HttpResponse.body(), may be {@code null}
          * @return a {@code BodySubscriber}
          */
-        public static <U> BodySubscriber<U> discard(U value) {
+        public static <U> BodySubscriber<U> replace(U value) {
             return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value));
         }
 
         /**
+         * Returns a response subscriber which discards the response body.
+         *
+         * @return a response body subscriber
+         */
+        public static BodySubscriber<Void> discard() {
+            return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(null));
+        }
+
+        /**
          * Returns a {@code BodySubscriber} which buffers data before delivering
          * it to the given downstream subscriber. The subscriber guarantees to
          * deliver {@code buffersize} bytes of data to each invocation of the
@@ -1093,5 +1124,48 @@
                  throw new IllegalArgumentException("must be greater than 0");
              return new BufferingSubscriber<T>(downstream, bufferSize);
          }
+
+        /**
+         * Returns a {@code BodySubscriber} whose response body is mapped
+         * using the supplied mapping function from one type {@code <T>} to
+         * another type {@code <U>}. The mapping function is executed
+         * using the {@link Executor} of the sending client and can
+         * therefore be used to map any response body type, including
+         * blocking {@link java.io.InputStream}s as shown in the following
+         * example which uses a well-known JSON parser to convert an {@code InputStream}
+         * into any annotated Java object type.
+         * <p>
+         * <b>Example usage</b>
+         * <p> <pre> {@code
+         * public static <W> BodySubscriber<W> asJSON(Class<W> targetType) {
+         *     BodySubscriber<InputStream> upstream = BodySubscriber.asInputStream();
+         *
+         *     BodySubscriber<W> downstream = mappedFrom(
+         *           upstream,
+         *           (InputStream is) -> {
+         *               try (InputStream stream = is) {
+         *                   ObjectMapper objectMapper = new ObjectMapper();
+         *                   W result = objectMapper.readValue(stream, targetType);
+         *                   return result;
+         *               } catch (IOException e) {
+         *                   throw new UncheckedIOException(e);
+         *               }
+         *           });
+         *    return downstream;
+         * }
+         * }</pre>
+         *
+         * @param <T> the type of the body subscriber to be mapped
+         * @param <U> the type of the body subscriber returned
+         * @param upstream the body subscriber to be mapped
+         * @param mapper the mapping function
+         * @return a mapped body subscriber
+         */
+        public static <T,U> BodySubscriber<U> mappedFrom(
+                BodySubscriber<T> upstream,
+                Function<T, U> mapper)
+        {
+            return new ResponseSubscribers.MappedSubscriber<T, U>(upstream, mapper);
+        }
     }
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Exchange.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Exchange.java	Tue Feb 06 16:07:43 2018 +0000
@@ -408,7 +408,7 @@
     }
 
     HttpResponse.BodySubscriber<T> ignoreBody(int status, HttpHeaders hdrs) {
-        return HttpResponse.BodySubscriber.discard((T)null);
+        return HttpResponse.BodySubscriber.replace(null);
     }
 
     // if this response was received in reply to an upgrade
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java	Tue Feb 06 16:07:43 2018 +0000
@@ -152,7 +152,7 @@
             connection.close();
             return MinimalFuture.completedFuture(null); // not treating as error
         } else {
-            return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
+            return readBody(HttpResponse.BodySubscriber.discard(), true, executor);
         }
     }
 
@@ -161,8 +161,8 @@
                                          Executor executor) {
         this.return2Cache = return2Cache;
         final HttpResponse.BodySubscriber<U> pusher = p;
-        final CompletionStage<U> bodyCF = p.getBody();
-        final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
+
+        final CompletableFuture<U> cf = new MinimalFuture<>();
 
         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 
@@ -242,6 +242,13 @@
                 }
             }
         });
+        p.getBody().whenComplete((U u, Throwable t) -> {
+            if (t == null)
+                cf.complete(u);
+            else
+                cf.completeExceptionally(t);
+        });
+
         return cf;
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PlainTunnelingConnection.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PlainTunnelingConnection.java	Tue Feb 06 16:07:43 2018 +0000
@@ -69,7 +69,7 @@
                 assert client != null;
                 HttpRequestImpl req = new HttpRequestImpl("CONNECT", address, proxyHeaders);
                 MultiExchange<Void> mulEx = new MultiExchange<>(null, req,
-                        client, discard(null), null, null);
+                        client, discard(), null, null);
                 Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
 
                 return connectExchange
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseSubscribers.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseSubscribers.java	Tue Feb 06 16:07:43 2018 +0000
@@ -49,6 +49,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
@@ -467,56 +468,18 @@
 
     }
 
-    /**
-     * A {@code Stream<String>} built on top of the Flow API.
-     */
-    public static final class HttpLineStream implements BodySubscriber<Stream<String>> {
-
-        private final HttpResponseInputStream responseInputStream;
-        private final Charset charset;
-        private HttpLineStream(Charset charset) {
-            this.charset = Objects.requireNonNull(charset);
-            responseInputStream = new HttpResponseInputStream();
-        }
-
-        @Override
-        public CompletionStage<Stream<String>> getBody() {
-            return responseInputStream.getBody().thenApply((is) ->
-                    new BufferedReader(new InputStreamReader(is, charset))
-                            .lines().onClose(this::close));
-        }
-
-        @Override
-        public void onSubscribe(Subscription subscription) {
-            responseInputStream.onSubscribe(subscription);
-        }
+    public static BodySubscriber<Stream<String>> createLineStream() {
+        return createLineStream(UTF_8);
+    }
 
-        @Override
-        public void onNext(List<ByteBuffer> item) {
-            responseInputStream.onNext(item);
-        }
-
-        @Override
-        public void onError(Throwable throwable) {
-            responseInputStream.onError(throwable);
-        }
-
-        @Override
-        public void onComplete() {
-            responseInputStream.onComplete();
-        }
-
-        void close() {
-            try {
-                responseInputStream.close();
-            } catch (IOException x) {
-                // ignore
-            }
-        }
-
-        public static HttpLineStream create(Charset charset) {
-            return new HttpLineStream(Optional.ofNullable(charset).orElse(UTF_8));
-        }
+    public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
+        Objects.requireNonNull(charset);
+        BodySubscriber<InputStream> s = new HttpResponseInputStream();
+        return new MappedSubscriber<InputStream,Stream<String>>(s,
+            (InputStream stream) -> {
+                return new BufferedReader(new InputStreamReader(stream, charset))
+                            .lines().onClose(() -> Utils.close(stream));
+            });
     }
 
     /**
@@ -630,4 +593,58 @@
             return cf;
         }
     }
+
+    /**
+     * A body subscriber which receives input from an upstream subscriber
+     * and maps that subscriber's body type to a new type. The upstream subscriber
+     * delegates all flow operations directly to this object. The
+     * {@link CompletionStage} returned by {@link #getBody()}} takes the output
+     * of the upstream {@code getBody()} and applies the mapper function to
+     * obtain the new {@code CompletionStage} type.
+     *
+     * Uses an Executor that must be set externally.
+     *
+     * @param <T> the upstream body type
+     * @param <U> this subscriber's body type
+     */
+    public static class MappedSubscriber<T,U> implements BodySubscriber<U> {
+        final BodySubscriber<T> upstream;
+        final Function<T,U> mapper;
+
+        /**
+         *
+         * @param upstream
+         * @param mapper
+         */
+        public MappedSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) {
+            this.upstream = upstream;
+            this.mapper = mapper;
+        }
+
+        @Override
+        public CompletionStage<U> getBody() {
+            return upstream.getBody()
+                    .thenApply(mapper);
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            upstream.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            upstream.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            upstream.onError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            upstream.onComplete();
+        }
+    }
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java	Tue Feb 06 16:07:43 2018 +0000
@@ -231,7 +231,7 @@
     {
         Log.logTrace("Reading body on stream {0}", streamid);
         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData(bodySubscriber);
+        CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 
         PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
@@ -262,8 +262,18 @@
 
     // pushes entire response body into response subscriber
     // blocking when required by local or remote flow control
-    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
-        responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
+    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
+        responseBodyCF = new MinimalFuture<>();
+        // We want to allow the subscriber's getBody() method to block so it
+        // can work with InputStreams. So, we offload execution.
+        executor.execute(() -> {
+            bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+                if (t == null)
+                    responseBodyCF.complete(body);
+                else
+                    responseBodyCF.completeExceptionally(t);
+            });
+        });
 
         if (isCanceled()) {
             Throwable t = getCancelCause();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Tue Feb 06 16:07:43 2018 +0000
@@ -186,7 +186,7 @@
 
     public CompletableFuture<Result> send() {
         PrivilegedAction<CompletableFuture<Result>> pa = () ->
-                client.sendAsync(this.request, BodyHandler.<Void>discard(null))
+                client.sendAsync(this.request, BodyHandler.discard())
                       .thenCompose(this::resultFrom);
         return AccessController.doPrivileged(pa);
     }
--- a/test/jdk/java/net/httpclient/ConcurrentResponses.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ConcurrentResponses.java	Tue Feb 06 16:07:43 2018 +0000
@@ -158,7 +158,7 @@
         }
 
         // initial connection to seed the cache so next parallel connections reuse it
-        client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
+        client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard()).join();
 
         // will reuse connection cached from the previous request ( when HTTP/2 )
         CompletableFuture.allOf(requests.keySet().parallelStream()
@@ -183,7 +183,7 @@
         }
 
         // initial connection to seed the cache so next parallel connections reuse it
-        client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
+        client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard()).join();
 
         // will reuse connection cached from the previous request ( when HTTP/2 )
         CompletableFuture.allOf(requests.keySet().parallelStream()
--- a/test/jdk/java/net/httpclient/HandshakeFailureTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/HandshakeFailureTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -87,7 +87,7 @@
                                              .version(version)
                                              .build();
             try {
-                HttpResponse<Void> response = client.send(request, discard(null));
+                HttpResponse<Void> response = client.send(request, discard());
                 String msg = String.format("UNEXPECTED response=%s%n", response);
                 throw new RuntimeException(msg);
             } catch (SSLHandshakeException expected) {
@@ -106,7 +106,7 @@
                                              .version(version)
                                              .build();
             try {
-                HttpResponse<Void> response = client.send(request, discard(null));
+                HttpResponse<Void> response = client.send(request, discard());
                 String msg = String.format("UNEXPECTED response=%s%n", response);
                 throw new RuntimeException(msg);
             } catch (SSLHandshakeException expected) {
@@ -124,7 +124,7 @@
                                              .version(version)
                                              .build();
             CompletableFuture<HttpResponse<Void>> response =
-                        client.sendAsync(request, discard(null));
+                        client.sendAsync(request, discard());
             try {
                 response.join();
                 String msg = String.format("UNEXPECTED response=%s%n", response);
@@ -150,7 +150,7 @@
                                              .version(version)
                                              .build();
             CompletableFuture<HttpResponse<Void>> response =
-                    client.sendAsync(request, discard(null));
+                    client.sendAsync(request, discard());
             try {
                 response.join();
                 String msg = String.format("UNEXPECTED response=%s%n", response);
--- a/test/jdk/java/net/httpclient/ImmutableHeaders.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ImmutableHeaders.java	Tue Feb 06 16:07:43 2018 +0000
@@ -81,7 +81,7 @@
                 throw new RuntimeException("Test failed");
             } catch (UnsupportedOperationException ex) {
             }
-            HttpResponse resp = client.send(req, discard(null));
+            HttpResponse resp = client.send(req, discard());
             try {
                 HttpHeaders hd = resp.headers();
                 List<String> v = hd.allValues("X-Foo-Response");
--- a/test/jdk/java/net/httpclient/InterruptedBlockingSend.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/InterruptedBlockingSend.java	Tue Feb 06 16:07:43 2018 +0000
@@ -47,7 +47,7 @@
 
             Thread t = new Thread(() -> {
                 try {
-                    client.send(request, discard(null));
+                    client.send(request, discard());
                 } catch (InterruptedException e) {
                     throwable = e;
                 } catch (Throwable th) {
--- a/test/jdk/java/net/httpclient/InvalidSSLContextTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/InvalidSSLContextTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -81,7 +81,7 @@
                 .build();
 
         try {
-            HttpResponse<?> response = client.send(request, BodyHandler.discard(""));
+            HttpResponse<?> response = client.send(request, BodyHandler.discard());
             Assert.fail("UNEXPECTED response" + response);
         } catch (SSLException sslex) {
             System.out.println("Caught expected: " + sslex);
@@ -100,7 +100,7 @@
                 .build();
 
         assertExceptionally(SSLException.class,
-                            client.sendAsync(request, BodyHandler.discard("")));
+                            client.sendAsync(request, BodyHandler.discard()));
     }
 
     static void assertExceptionally(Class<? extends Throwable> clazz,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/MappedResponseSubscriber.java	Tue Feb 06 16:07:43 2018 +0000
@@ -0,0 +1,307 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Tests mapped response subscriber
+ * @library /lib/testlibrary http2/server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm MappedResponseSubscriber
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpHeaders;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import  jdk.incubator.http.HttpResponse.BodySubscriber;
+import javax.net.ssl.SSLContext;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static jdk.incubator.http.HttpResponse.BodySubscriber.asString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class MappedResponseSubscriber {
+
+    SSLContext sslContext;
+    HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
+    HttpsServer httpsTestServer;       // HTTPS/1.1
+    Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
+    Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
+    String httpURI_fixed;
+    String httpURI_chunk;
+    String httpsURI_fixed;
+    String httpsURI_chunk;
+    String http2URI_fixed;
+    String http2URI_chunk;
+    String https2URI_fixed;
+    String https2URI_chunk;
+
+    static final int ITERATION_COUNT = 10;
+    // a shared executor helps reduce the amount of threads created by the test
+    static final Executor executor = Executors.newCachedThreadPool();
+
+    @DataProvider(name = "variants")
+    public Object[][] variants() {
+        return new Object[][]{
+                { httpURI_fixed,    false },
+                { httpURI_chunk,    false },
+                { httpsURI_fixed,   false },
+                { httpsURI_chunk,   false },
+                { http2URI_fixed,   false },
+                { http2URI_chunk,   false },
+                { https2URI_fixed,  false,},
+                { https2URI_chunk,  false },
+
+                { httpURI_fixed,    true },
+                { httpURI_chunk,    true },
+                { httpsURI_fixed,   true },
+                { httpsURI_chunk,   true },
+                { http2URI_fixed,   true },
+                { http2URI_chunk,   true },
+                { https2URI_fixed,  true,},
+                { https2URI_chunk,  true },
+        };
+    }
+
+    HttpClient newHttpClient() {
+        return HttpClient.newBuilder()
+                         .executor(executor)
+                         .sslContext(sslContext)
+                         .build();
+    }
+
+    @Test(dataProvider = "variants")
+    public void testAsBytes(String uri, boolean sameClient) throws Exception {
+        HttpClient client = null;
+        for (int i=0; i< ITERATION_COUNT; i++) {
+            if (!sameClient || client == null)
+                client = newHttpClient();
+
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+                                         .build();
+            BodyHandler<byte[]> handler = new CRSBodyHandler();
+            HttpResponse<byte[]> response = client.send(req, handler);
+            byte[] body = response.body();
+            assertEquals(body, bytes);
+        }
+    }
+
+    static class CRSBodyHandler implements BodyHandler<byte[]> {
+        @Override
+        public BodySubscriber<byte[]> apply(int statusCode, HttpHeaders responseHeaders) {
+            assertEquals(statusCode, 200);
+            return HttpResponse.BodySubscriber.mappedFrom(
+                new CRSBodySubscriber(), (s) -> s.getBytes()
+            );
+        }
+    }
+
+    static class CRSBodySubscriber implements BodySubscriber<String> {
+        private final BodySubscriber<String> asString = asString(UTF_8);
+        volatile boolean onSubscribeCalled;
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            //out.println("onSubscribe ");
+            onSubscribeCalled = true;
+            asString.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+           // out.println("onNext " + item);
+            assertTrue(onSubscribeCalled);
+            asString.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            //out.println("onError");
+            assertTrue(onSubscribeCalled);
+            asString.onError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            //out.println("onComplete");
+            assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
+            asString.onComplete();
+        }
+
+        @Override
+        public CompletionStage<String> getBody() {
+            return asString.getBody();
+        }
+    }
+
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        // HTTP/1.1
+        HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
+        HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
+        InetSocketAddress sa = new InetSocketAddress("localhost", 0);
+        httpTestServer = HttpServer.create(sa, 0);
+        httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
+        httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
+        httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed";
+        httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunk";
+
+        httpsTestServer = HttpsServer.create(sa, 0);
+        httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+        httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
+        httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
+        httpsURI_fixed = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed";
+        httpsURI_chunk = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunk";
+
+        // HTTP/2
+        Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
+        Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
+
+        http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+        http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
+        http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
+        int port = http2TestServer.getAddress().getPort();
+        http2URI_fixed = "http://127.0.0.1:" + port + "/http2/fixed";
+        http2URI_chunk = "http://127.0.0.1:" + port + "/http2/chunk";
+
+        https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+        https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
+        https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
+        port = https2TestServer.getAddress().getPort();
+        https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed";
+        https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk";
+
+        httpTestServer.start();
+        httpsTestServer.start();
+        http2TestServer.start();
+        https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        httpTestServer.stop(0);
+        httpsTestServer.stop(0);
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static byte[] bytes;
+
+    static {
+        bytes = new byte[128 * 1024];
+        int b = 'A';
+
+        for (int i=0; i< bytes.length; i++) {
+            bytes[i] = (byte)b;
+            b = b == 'Z'? 'A' : b + 1;
+        }
+    }
+
+    static class HTTP1_FixedLengthHandler implements HttpHandler {
+        @Override
+        public void handle(HttpExchange t) throws IOException {
+            out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            t.sendResponseHeaders(200, bytes.length);  //no body
+            OutputStream os = t.getResponseBody();
+            os.write(bytes);
+            os.close();
+        }
+    }
+
+    static class HTTP1_ChunkedHandler implements HttpHandler {
+        @Override
+        public void handle(HttpExchange t) throws IOException {
+            out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            t.sendResponseHeaders(200, 0); // chunked
+            OutputStream os = t.getResponseBody();
+            os.write(bytes);
+            os.close();
+        }
+    }
+
+    static class HTTP2_FixedLengthHandler implements Http2Handler {
+        @Override
+        public void handle(Http2TestExchange t) throws IOException {
+            out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            t.sendResponseHeaders(200, 0); // chunked
+            OutputStream os = t.getResponseBody();
+            os.write(bytes);
+            os.close();
+        }
+    }
+
+    static class HTTP2_VariableHandler implements Http2Handler {
+        @Override
+        public void handle(Http2TestExchange t) throws IOException {
+            out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
+            try (InputStream is = t.getRequestBody()) {
+                is.readAllBytes();
+            }
+            t.sendResponseHeaders(200, bytes.length);  //no body
+            OutputStream os = t.getResponseBody();
+            os.write(bytes);
+            os.close();
+        }
+    }
+}
--- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java	Tue Feb 06 16:07:43 2018 +0000
@@ -47,7 +47,7 @@
 import static jdk.incubator.http.HttpResponse.BodyHandler.asByteArrayConsumer;
 import static jdk.incubator.http.HttpResponse.BodyHandler.asInputStream;
 import static jdk.incubator.http.HttpResponse.BodyHandler.buffering;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -128,7 +128,7 @@
                     .PUT(fromString(SIMPLE_STRING))
                     .build();
             Object obj = new Object();
-            HttpResponse<Object> response = client.send(req, discard(obj));
+            HttpResponse<Object> response = client.send(req, replace(obj));
             assertEquals(response.body(), obj);
         }
         // We have created many clients here. Try to speed up their release.
--- a/test/jdk/java/net/httpclient/ProxyAuthTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ProxyAuthTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -76,7 +76,7 @@
                                           .authenticator(auth)
                                           .build();
             HttpRequest req = HttpRequest.newBuilder(uri).GET().build();
-            HttpResponse<?> resp = client.sendAsync(req, discard(null)).get();
+            HttpResponse<?> resp = client.sendAsync(req, discard()).get();
             if (resp.statusCode() != 404) {
                 throw new RuntimeException("Unexpected status code: " + resp.statusCode());
             }
--- a/test/jdk/java/net/httpclient/RequestBodyTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/RequestBodyTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -260,7 +260,7 @@
                 break;
             case DISCARD:
                 Object o = new Object();
-                BodyHandler<Object> bh2 = discard(o);
+                BodyHandler<Object> bh2 = replace(o);
                 if (bufferResponseBody) bh2 = buffering(bh2, 51);
                 HttpResponse<Object> or = getResponse(client, request, bh2, async);
                 assertEquals(or.statusCode(), 200);
--- a/test/jdk/java/net/httpclient/ShortRequestBody.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ShortRequestBody.java	Tue Feb 06 16:07:43 2018 +0000
@@ -164,7 +164,7 @@
         HttpRequest request = HttpRequest.newBuilder(uri)
                                          .POST(publisher)
                                          .build();
-        cf = clientSupplier.get().sendAsync(request, discard(null));
+        cf = clientSupplier.get().sendAsync(request, discard());
 
         HttpResponse<Void> resp = cf.get(30, TimeUnit.SECONDS);
         err.println("Response code: " + resp.statusCode());
@@ -180,7 +180,7 @@
         HttpRequest request = HttpRequest.newBuilder(uri)
                                          .POST(publisher)
                                          .build();
-        cf = clientSupplier.get().sendAsync(request, discard(null));
+        cf = clientSupplier.get().sendAsync(request, discard());
 
         try {
             HttpResponse<Void> r = cf.get(30, TimeUnit.SECONDS);
@@ -207,7 +207,7 @@
                                          .POST(publisher)
                                          .build();
         try {
-            HttpResponse<Void> r = clientSupplier.get().send(request, discard(null));
+            HttpResponse<Void> r = clientSupplier.get().send(request, discard());
             throw new RuntimeException("Unexpected response: " + r.statusCode());
         } catch (HttpTimeoutException x) {
             throw new RuntimeException("Unexpected timeout", x);
--- a/test/jdk/java/net/httpclient/SmallTimeout.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/SmallTimeout.java	Tue Feb 06 16:07:43 2018 +0000
@@ -34,7 +34,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import static java.lang.System.out;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
 
 /**
  * @test
@@ -92,7 +92,7 @@
 
                 final HttpRequest req = requests[i];
                 CompletableFuture<HttpResponse<Object>> response = client
-                    .sendAsync(req, discard(null))
+                    .sendAsync(req, replace(null))
                     .whenComplete((HttpResponse<Object> r, Throwable t) -> {
                         Throwable cause = null;
                         if (r != null) {
@@ -142,7 +142,7 @@
                 executor.execute(() -> {
                     Throwable cause = null;
                     try {
-                        client.send(req, discard(null));
+                        client.send(req, replace(null));
                     } catch (HttpTimeoutException e) {
                         out.println("Caught expected timeout: " + e);
                     } catch (Throwable ee) {
--- a/test/jdk/java/net/httpclient/TimeoutBasic.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/TimeoutBasic.java	Tue Feb 06 16:07:43 2018 +0000
@@ -152,7 +152,7 @@
                 if (request == null) continue;
                 count++;
                 try {
-                    HttpResponse<?> resp = client.sendAsync(request, discard(null)).join();
+                    HttpResponse<?> resp = client.sendAsync(request, discard()).join();
                     out.println("Unexpected response for: " + request);
                     out.println("\t from " + ss.getLocalSocketAddress());
                     out.println("Response is: " + resp);
@@ -177,7 +177,7 @@
                 if (request == null) continue;
                 count++;
                 try {
-                    client.send(request, discard(null));
+                    client.send(request, discard());
                 } catch (HttpTimeoutException e) {
                     out.println("Caught expected timeout: " + e);
                 }
--- a/test/jdk/java/net/httpclient/TimeoutOrdering.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/TimeoutOrdering.java	Tue Feb 06 16:07:43 2018 +0000
@@ -34,7 +34,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import static java.lang.System.out;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
 
 /**
  * @test
@@ -74,7 +74,7 @@
 
                 final HttpRequest req = requests[i];
                 CompletableFuture<HttpResponse<Object>> response = client
-                    .sendAsync(req, discard(null))
+                    .sendAsync(req, replace(null))
                     .whenComplete((HttpResponse<Object> r, Throwable t) -> {
                         if (r != null) {
                             out.println("Unexpected response: " + r);
@@ -115,7 +115,7 @@
                 final HttpRequest req = requests[i];
                 executor.execute(() -> {
                     try {
-                        client.send(req, discard(null));
+                        client.send(req, replace(null));
                     } catch (HttpTimeoutException e) {
                         out.println("Caught expected timeout: " + e);
                         queue.offer(req);
--- a/test/jdk/java/net/httpclient/VersionTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/VersionTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -98,7 +98,7 @@
                 .GET()
                 .build();
         HttpClient c = proxy ? clientWithProxy : client;
-        HttpResponse<Void> resp = c.send(r, discard(null));
+        HttpResponse<Void> resp = c.send(r, discard());
         System.out.printf("Client: response is %d\n", resp.statusCode());
         if (resp.version() != HTTP_1_1) {
             throw new RuntimeException();
--- a/test/jdk/java/net/httpclient/ZeroRedirects.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ZeroRedirects.java	Tue Feb 06 16:07:43 2018 +0000
@@ -77,7 +77,7 @@
         HttpRequest r = HttpRequest.newBuilder(uri)
                 .GET()
                 .build();
-        HttpResponse<Void> resp = client.send(r, discard(null));
+        HttpResponse<Void> resp = client.send(r, discard());
         System.out.printf("Client: response is %d\n", resp.statusCode());
         if (resp.statusCode() != 200)
             throw new RuntimeException();
--- a/test/jdk/java/net/httpclient/http2/ErrorTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/ErrorTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -96,7 +96,7 @@
                                     .build();
             HttpResponse response;
             try {
-                response = client.send(req, discard(null));
+                response = client.send(req, discard());
                 throw new RuntimeException("Unexpected response: " + response);
             } catch (IOException e) {
                 System.err.println("Caught Expected IOException: " + e);
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Tue Feb 06 16:07:43 2018 +0000
@@ -86,8 +86,14 @@
             throw new IllegalStateException("sendResponseHeaders must be called first");
         }
         try {
-            waitForWindow(len);
-            send(buf, offset, len, 0);
+            int max = conn.getMaxFrameSize();
+            while (len > 0) {
+                int n = len > max ? max : len;
+                waitForWindow(n);
+                send(buf, offset, n, 0);
+                offset += n;
+                len -= n;
+            }
         } catch (InterruptedException ex) {
             throw new IOException(ex);
         }
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Tue Feb 06 16:07:43 2018 +0000
@@ -312,6 +312,10 @@
         return (SettingsFrame)frame;
     }
 
+    public int getMaxFrameSize() {
+        return clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE);
+    }
+
     void run() throws Exception {
         Http1InitialRequest upgrade = null;
         if (!secure) {
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/RawChannelTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/RawChannelTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -193,7 +193,7 @@
         ((WebSocketRequest)req).isWebSocket(true);
         HttpClient client = HttpClient.newHttpClient();
         try {
-            HttpResponse<?> r = client.send(req, discard(null));
+            HttpResponse<?> r = client.send(req, discard());
             r.body();
             return ((HttpResponseImpl) r).rawChannel();
         } finally {
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/SelectorTest.java	Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/SelectorTest.java	Tue Feb 06 16:07:43 2018 +0000
@@ -153,7 +153,7 @@
         // thus all ordinary procedures apply to it, e.g. it must be put into
         // the cache
         ((HttpRequestImpl) req).isWebSocket(true);
-        HttpResponse<?> r = defaultClient().send(req, discard(null));
+        HttpResponse<?> r = defaultClient().send(req, discard());
         r.body();
         return ((HttpResponseImpl) r).rawChannel();
     }