http-client-branch: Adds some convenience methods to parse body responses as lines http-client-branch
authordfuchs
Fri, 12 Jan 2018 15:36:28 +0000
branchhttp-client-branch
changeset 56009 cf8792f51dee
parent 56008 bbd688c6fbbb
child 56010 782b2f2d1e76
http-client-branch: Adds some convenience methods to parse body responses as lines
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java
test/jdk/java/net/httpclient/LineAdaptersCompileOnly.java
test/jdk/java/net/httpclient/LineBodyHandlerTest.java
test/jdk/java/net/httpclient/LineStreamsAndSurrogatesTest.java
test/jdk/java/net/httpclient/LineSubscribersAndSurrogatesTest.java
test/jdk/java/net/httpclient/http2/server/BodyInputStream.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,7 @@
 
 package jdk.incubator.http;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -34,12 +35,12 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.security.AccessControlContext;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -49,6 +50,7 @@
 import java.util.concurrent.Flow.Subscriber;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import javax.net.ssl.SSLParameters;
 
 /**
@@ -399,6 +401,91 @@
         }
 
         /**
+         * Returns a response body handler that returns a {@link BodySubscriber
+         * BodySubscriber}{@code <Void>} obtained from {@link
+         * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String)
+         * BodySubscriber.fromLineSubscriber(subscriber, s -> null, charset, null)},
+         * with the given {@code subscriber}.
+         * The {@link Charset charset} used to decode the response body bytes is
+         * obtained from the HTTP response headers as specified by {@link #asString()},
+         * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
+         *
+         * <p> The response body is not available through this, or the {@code
+         * HttpResponse} API, but instead all response body is forwarded to the
+         * given {@code subscriber}, which should make it available, if
+         * appropriate, through some other mechanism, e.g. an entry in a
+         * database, etc.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * <p> For example:
+         * <pre> {@code
+         *  TextSubscriber subscriber = new TextSubscriber();
+         *  HttpResponse<Void> response = client.sendAsync(request,
+         *      BodyHandler.fromLineSubscriber(subscriber, "\n")).join();
+         *  System.out.println(response.statusCode());
+         * }</pre>
+         *
+         * @param subscriber the subscriber
+         * @return a response body handler
+         */
+        public static BodyHandler<Void>
+        fromLineSubscriber(Subscriber<? super String> subscriber) {
+            Objects.requireNonNull(subscriber);
+            return (status, headers)
+                    -> BodySubscriber.fromLineSubscriber(subscriber, s -> null,
+                    charsetFrom(headers), null);
+        }
+
+        /**
+         * Returns a response body handler that returns a {@link BodySubscriber
+         * BodySubscriber}{@code <T>} obtained from {@link
+         * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String)
+         * BodySubscriber.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)},
+         * with the given {@code subscriber}, {@code finisher} function, and line separator.
+         * The {@link Charset charset} used to decode the response body bytes is
+         * obtained from the HTTP response headers as specified by {@link #asString()}.
+         *
+         * <p> The given {@code finisher} function is applied after the given
+         * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+         * function is invoked with the given subscriber, and returns a value
+         * that is set as the response's body.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * <p> For example:
+         * <pre> {@code
+         * TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
+         * HttpResponse<String> response = client.sendAsync(request,
+         *     BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult, "\n")).join();
+         * String text = response.body();
+         * }</pre>
+         *
+         * @param <S> the type of the Subscriber
+         * @param <T> the type of the response body
+         * @param subscriber the subscriber
+         * @param finisher a function to be applied after the subscriber has completed
+         * @param lineSeparator an optional line separator: can be {@code null},
+         *                      in which case lines will be delimited in the manner of
+         *                      {@link BufferedReader#readLine()}.
+         * @return a response body handler
+         * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string.
+         */
+        public static <S extends Subscriber<? super String>,T> BodyHandler<T>
+        fromLineSubscriber(S subscriber, Function<S,T> finisher, String lineSeparator) {
+            Objects.requireNonNull(subscriber);
+            Objects.requireNonNull(finisher);
+            // implicit null check
+            if (lineSeparator != null && lineSeparator.isEmpty())
+                throw new IllegalArgumentException("empty line separator");
+            return (status, headers) ->
+                    BodySubscriber.fromLineSubscriber(subscriber, finisher,
+                            charsetFrom(headers), lineSeparator);
+        }
+
+        /**
          * Returns a response body handler which discards the response body and
          * uses the given value as a replacement for it.
          *
@@ -542,6 +629,25 @@
         }
 
         /**
+         * Returns a {@code BodyHandler<Stream<String>>} that returns a
+         * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained from
+         * {@link BodySubscriber#asLines(Charset)}
+         * BodySubscriber.asLines(charset)}.
+         * The {@link Charset charset} used to decode the response body bytes is
+         * obtained from the HTTP response headers as specified by {@link #asString()},
+         * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
+         *
+         * <p> When the {@code HttpResponse} object is returned, the body may
+         * not have been completely received.
+         *
+         * @return a response body handler
+         */
+        public static BodyHandler<Stream<String>> asLines() {
+            return (status, headers) ->
+                    BodySubscriber.asLines(charsetFrom(headers));
+        }
+
+        /**
          * Returns a {@code BodyHandler<Void>} that returns a
          * {@link BodySubscriber BodySubscriber}{@code <Void>} obtained from
          * {@link BodySubscriber#asByteArrayConsumer(Consumer)
@@ -578,7 +684,7 @@
          * {@link BodySubscriber#asString(java.nio.charset.Charset)
          * BodySubscriber.asString(Charset)}. The body is
          * decoded using the character set specified in
-         * the {@code Content-encoding} response header. If there is no such
+         * the {@code Content-type} response header. If there is no such
          * header, or the character set is not supported, then
          * {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used.
          *
@@ -712,6 +818,71 @@
         }
 
         /**
+         * Returns a body subscriber that forwards all response body to the
+         * given {@code Flow.Subscriber}, lines by lines.
+         * The {@linkplain #getBody()} completion
+         * stage} of the returned body subscriber completes after one of the
+         * given subscribers {@code onComplete} or {@code onError} has been
+         * invoked.
+         * Bytes are decoded using the {@linkplain StandardCharsets#UTF_8
+         * UTF-8} charset, and lines are delimited in the manner of
+         * {@link BufferedReader#readLine()}.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * @implNote This is equivalent to calling <pre>{@code
+         *      fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null)
+         * }</pre>
+         *
+         * @param <S> the type of the Subscriber
+         * @param subscriber the subscriber
+         * @return a body subscriber
+         */
+        public static <S extends Subscriber<? super String>> BodySubscriber<Void>
+        fromLineSubscriber(S subscriber) {
+            return fromLineSubscriber(subscriber, s -> null,
+                    StandardCharsets.UTF_8, null);
+        }
+
+        /**
+         * Returns a body subscriber that forwards all response body to the
+         * given {@code Flow.Subscriber}, lines by lines.
+         * The {@linkplain #getBody()} completion
+         * stage} of the returned body subscriber completes after one of the
+         * given subscribers {@code onComplete} or {@code onError} has been
+         * invoked.
+         *
+         * <p> The given {@code finisher} function is applied after the given
+         * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+         * function is invoked with the given subscriber, and returns a value
+         * that is set as the response's body.
+         *
+         * @apiNote This method can be used as an adapter between {@code
+         * BodySubscriber} and {@code Flow.Subscriber}.
+         *
+         * @param <S> the type of the Subscriber
+         * @param <T> the type of the response body
+         * @param subscriber the subscriber
+         * @param finisher a function to be applied after the subscriber has
+         *                 completed
+         * @param charset a {@link Charset} to decode the bytes
+         * @param lineSeparator an optional line separator: can be {@code null},
+         *                      in which case lines will be delimited in the manner of
+         *                      {@link BufferedReader#readLine()}.
+         * @return a body subscriber
+         * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string.
+         */
+        public static <S extends Subscriber<? super String>,T> BodySubscriber<T>
+        fromLineSubscriber(S subscriber,
+                           Function<S,T> finisher,
+                           Charset charset,
+                           String lineSeparator) {
+            return LineSubscriberAdapter.create(subscriber,
+                    finisher, charset, lineSeparator);
+        }
+
+        /**
          * Returns a body subscriber which stores the response body as a {@code
          * String} converted using the given {@code Charset}.
          *
@@ -847,6 +1018,33 @@
         }
 
         /**
+         * Returns a {@code BodySubscriber} which streams the response body as
+         * a {@link Stream Stream<String>}, where each string in the stream
+         * corresponds to a line as defined by {@link BufferedReader#lines()}.
+         *
+         * <p> The {@link HttpResponse} using this subscriber is available
+         * immediately after the response headers have been read, without
+         * requiring to wait for the entire body to be processed. The response
+         * body can then be read directly from the {@link Stream}.
+         *
+         * @apiNote To ensure that all resources associated with the
+         * corresponding exchange are properly released the caller must
+         * ensure to either read all lines until the stream is exhausted,
+         * or call {@link Stream#close} if it is unable or unwilling to do so.
+         * Calling {@code close} before exhausting the stream may cause
+         * the underlying HTTP connection to be closed and prevent it
+         * from being reused for subsequent operations.
+         *
+         * @return a body subscriber that streams the response body as a
+         *         {@link Stream Stream<String>}.
+         *
+         * @see BufferedReader#lines()
+         */
+        public static BodySubscriber<Stream<String>> asLines(Charset charset) {
+            return ResponseSubscribers.HttpLineStream.create(charset);
+        }
+
+        /**
          * Returns a response subscriber which discards the response body. The
          * supplied value is the value that will be returned from
          * {@link HttpResponse#body()}.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java	Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,467 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
+final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
+        implements HttpResponse.BodySubscriber<R> {
+    private final CompletableFuture<R> cf = new MinimalFuture<>();
+    private final S subscriber;
+    private final Function<S, R> finisher;
+    private final Charset charset;
+    private final String eol;
+    private volatile LineSubscription downstream;
+
+    private LineSubscriberAdapter(S subscriber,
+                                  Function<S, R> finisher,
+                                  Charset charset,
+                                  String eol) {
+        if (eol != null && eol.isEmpty())
+            throw new IllegalArgumentException("empty line separator");
+        this.subscriber = Objects.requireNonNull(subscriber);
+        this.finisher = Objects.requireNonNull(finisher);
+        this.charset = Objects.requireNonNull(charset);
+        this.eol = eol;
+    }
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        downstream = LineSubscription.create(subscription,
+                                             charset,
+                                             eol,
+                                             subscriber,
+                                             cf);
+        subscriber.onSubscribe(downstream);
+    }
+
+    @Override
+    public void onNext(List<ByteBuffer> item) {
+        try {
+            downstream.submit(item);
+        } catch (Throwable t) {
+            onError(t);
+        }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        try {
+            downstream.signalError(throwable);
+        } finally {
+            cf.completeExceptionally(throwable);
+        }
+    }
+
+    @Override
+    public void onComplete() {
+        try {
+            downstream.signalComplete();
+        } finally {
+            cf.complete(finisher.apply(subscriber));
+        }
+    }
+
+    @Override
+    public CompletionStage<R> getBody() {
+        return cf;
+    }
+
+    static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
+    create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
+    {
+        if (eol != null && eol.isEmpty())
+            throw new IllegalArgumentException("empty line separator");
+        return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
+                Objects.requireNonNull(finisher),
+                Objects.requireNonNull(charset),
+                eol);
+    }
+
+    static final class LineSubscription implements Flow.Subscription {
+        final Flow.Subscription upstreamSubscription;
+        final CharsetDecoder decoder;
+        final String newline;
+        final Demand downstreamDemand;
+        final ConcurrentLinkedDeque<ByteBuffer> queue;
+        final SequentialScheduler scheduler;
+        final Flow.Subscriber<? super String> upstream;
+        final CompletableFuture<?> cf;
+        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        private final AtomicLong demanded = new AtomicLong();
+        private volatile boolean completed;
+        private volatile boolean cancelled;
+
+        private final char[] chars = new char[1024];
+        private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
+        private final CharBuffer buffer = CharBuffer.wrap(chars);
+        private final StringBuilder builder = new StringBuilder();
+        private int lineCount;
+        private String nextLine;
+
+        private LineSubscription(Flow.Subscription s,
+                                 CharsetDecoder dec,
+                                 String separator,
+                                 Flow.Subscriber<? super String> subscriber,
+                                 CompletableFuture<?> completion) {
+            downstreamDemand = new Demand();
+            queue = new ConcurrentLinkedDeque<>();
+            upstreamSubscription = Objects.requireNonNull(s);
+            decoder = Objects.requireNonNull(dec);
+            newline = separator;
+            upstream = Objects.requireNonNull(subscriber);
+            cf = Objects.requireNonNull(completion);
+            scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
+        }
+
+        @Override
+        public void request(long n) {
+            if (cancelled) return;
+            if (downstreamDemand.increase(n)) {
+                scheduler.runOrSchedule();
+            }
+        }
+
+        @Override
+        public void cancel() {
+            cancelled = true;
+            upstreamSubscription.cancel();
+        }
+
+        public void submit(List<ByteBuffer> list) {
+            queue.addAll(list);
+            demanded.decrementAndGet();
+            scheduler.runOrSchedule();
+        }
+
+        public void signalComplete() {
+            completed = true;
+            scheduler.runOrSchedule();
+        }
+
+        public void signalError(Throwable error) {
+            if (errorRef.compareAndSet(null,
+                    Objects.requireNonNull(error))) {
+                scheduler.runOrSchedule();
+            }
+        }
+
+        // This method looks at whether some bytes where left over (in leftover)
+        // from decoding the previous buffer when the previous buffer was in
+        // underflow. If so, it takes bytes one by one from the new buffer 'in'
+        // and combines them with the leftover bytes until 'in' is exhausted or a
+        // character was produced in 'out', resolving the previous underflow.
+        // Returns true if the buffer is still in underflow, false otherwise.
+        // However, in both situation some chars might have been produced in 'out'.
+        private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
+                throws CharacterCodingException {
+            int limit = leftover.position();
+            if (limit == 0) {
+                // no leftover
+                return false;
+            } else {
+                CoderResult res = null;
+                while (in.hasRemaining()) {
+                    leftover.position(limit);
+                    leftover.limit(++limit);
+                    leftover.put(in.get());
+                    leftover.position(0);
+                    res = decoder.decode(leftover, out,
+                            endOfInput && !in.hasRemaining());
+                    int remaining = leftover.remaining();
+                    if (remaining > 0) {
+                        assert leftover.position() == 0;
+                        leftover.position(remaining);
+                    } else {
+                        leftover.position(0);
+                    }
+                    leftover.limit(leftover.capacity());
+                    if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
+                        continue;
+                    }
+                    if (res.isError()) {
+                        res.throwException();
+                    }
+                    assert !res.isOverflow();
+                    return false;
+                }
+                return !endOfInput;
+            }
+        }
+
+        // extract characters from start to end and remove them from
+        // the StringBuilder
+        private static String take(StringBuilder b, int start, int end) {
+            assert start == 0;
+            String line;
+            if (end == start) return "";
+            line = b.substring(start, end);
+            b.delete(start, end);
+            return line;
+        }
+
+        // finds end of line, returns -1 if not found, or the position after
+        // the line delimiter if found, removing the delimiter in the process.
+        private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
+            int len = b.length();
+            if (eol != null) { // delimiter explicitly specified
+                int i = b.indexOf(eol);
+                if (i >= 0) {
+                    // remove the delimiter and returns the position
+                    // of the char after it.
+                    b.delete(i, i + eol.length());
+                    return i;
+                }
+            } else { // no delimiter specified, behaves as BufferedReader::readLine
+                boolean crfound = false;
+                for (int i = 0; i < len; i++) {
+                    char c = b.charAt(i);
+                    if (c == '\n') {
+                        // '\n' or '\r\n' found.
+                        // remove the delimiter and returns the position
+                        // of the char after it.
+                        b.delete(crfound ? i - 1 : i, i + 1);
+                        return crfound ? i - 1 : i;
+                    } else if (crfound) {
+                        // previous char was '\r', c != '\n'
+                        assert i != 0;
+                        // remove the delimiter and returns the position
+                        // of the char after it.
+                        b.delete(i - 1, i);
+                        return i - 1;
+                    }
+                    crfound = c == '\r';
+                }
+                if (crfound && endOfInput) {
+                    // remove the delimiter and returns the position
+                    // of the char after it.
+                    b.delete(len - 1, len);
+                    return len - 1;
+                }
+            }
+            return endOfInput && len > 0 ? len : -1;
+        }
+
+        // Looks at whether the StringBuilder contains a line.
+        // Returns null if more character are needed.
+        private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
+            int next = endOfLine(b, eol, endOfInput);
+            return (next > -1) ? take(b, 0, next) : null;
+        }
+
+        // Attempts to read the next line. Returns the next line if
+        // the delimiter was found, null otherwise. The delimiters are
+        // consumed.
+        private String nextLine()
+                throws CharacterCodingException {
+            assert nextLine == null;
+            LINES:
+            while (nextLine == null) {
+                boolean endOfInput = completed && queue.isEmpty();
+                nextLine = nextLine(builder, newline,
+                        endOfInput && leftover.position() == 0);
+                if (nextLine != null) return nextLine;
+                ByteBuffer b;
+                BUFFERS:
+                while ((b = queue.peek()) != null) {
+                    if (!b.hasRemaining()) {
+                        queue.poll();
+                        continue BUFFERS;
+                    }
+                    BYTES:
+                    while (b.hasRemaining()) {
+                        buffer.position(0);
+                        buffer.limit(buffer.capacity());
+                        boolean endofInput = completed && queue.size() <= 1;
+                        if (isUnderFlow(b, buffer, endofInput)) {
+                            assert !b.hasRemaining();
+                            if (buffer.position() > 0) {
+                                buffer.flip();
+                                builder.append(buffer);
+                            }
+                            continue BUFFERS;
+                        }
+                        CoderResult res = decoder.decode(b, buffer, endofInput);
+                        if (res.isError()) res.throwException();
+                        if (buffer.position() > 0) {
+                            buffer.flip();
+                            builder.append(buffer);
+                            continue LINES;
+                        }
+                        if (res.isUnderflow() && b.hasRemaining()) {
+                            //System.out.println("underflow: adding " + b.remaining() + " bytes");
+                            leftover.put(b);
+                            assert !b.hasRemaining();
+                            continue BUFFERS;
+                        }
+                    }
+                }
+
+                assert queue.isEmpty();
+                if (endOfInput) {
+                    // Time to cleanup: there may be some undecoded leftover bytes
+                    // We need to flush them out.
+                    // The decoder has been configured to replace malformed/unmappable
+                    // chars with some replacement, in order to behave like
+                    // InputStreamReader.
+                    leftover.flip();
+                    buffer.position(0);
+                    buffer.limit(buffer.capacity());
+
+                    // decode() must be called just before flush, even if there
+                    // is nothing to decode. We must do this even if leftover
+                    // has no remaining bytes.
+                    CoderResult res = decoder.decode(leftover, buffer, endOfInput);
+                    if (buffer.position() > 0) {
+                        buffer.flip();
+                        builder.append(buffer);
+                    }
+                    if (res.isError()) res.throwException();
+
+                    // Now call decoder.flush()
+                    buffer.position(0);
+                    buffer.limit(buffer.capacity());
+                    res = decoder.flush(buffer);
+                    if (buffer.position() > 0) {
+                        buffer.flip();
+                        builder.append(buffer);
+                    }
+                    if (res.isError()) res.throwException();
+
+                    // It's possible that we reach here twice - just for the
+                    // purpose of checking that no bytes were left over, so
+                    // we reset leftover/decoder to make the function reentrant.
+                    leftover.position(0);
+                    leftover.limit(leftover.capacity());
+                    decoder.reset();
+
+                    // if some chars were produced then this call will
+                    // return them.
+                    return nextLine = nextLine(builder, newline, endOfInput);
+                }
+                return null;
+            }
+            return null;
+        }
+
+        // The main sequential scheduler loop.
+        private void loop() {
+            try {
+                while (!cancelled) {
+                    Throwable error = errorRef.get();
+                    if (error != null) {
+                        cancelled = true;
+                        scheduler.stop();
+                        upstream.onError(error);
+                        cf.completeExceptionally(error);
+                        return;
+                    }
+                    if (nextLine == null) nextLine = nextLine();
+                    if (nextLine == null) {
+                        if (completed) {
+                            scheduler.stop();
+                            if (leftover.position() != 0) {
+                                // Underflow: not all bytes could be
+                                // decoded, but no more bytes will be coming.
+                                // This should not happen as we should already
+                                // have got a MalformedInputException, or
+                                // replaced the unmappable chars.
+                                errorRef.compareAndSet(null,
+                                        new IllegalStateException(
+                                                "premature end of input ("
+                                                        + leftover.position()
+                                                        + " undecoded bytes)"));
+                                continue;
+                            } else {
+                                upstream.onComplete();
+                            }
+                            return;
+                        } else if (demanded.get() == 0
+                                && !downstreamDemand.isFulfilled()) {
+                            long incr = Math.max(1, downstreamDemand.get());
+                            demanded.addAndGet(incr);
+                            upstreamSubscription.request(incr);
+                            continue;
+                        } else return;
+                    }
+                    assert nextLine != null;
+                    assert newline != null && !nextLine.endsWith(newline)
+                            || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
+                    if (downstreamDemand.tryDecrement()) {
+                        String forward = nextLine;
+                        nextLine = null;
+                        upstream.onNext(forward);
+                    } else return; // no demand: come back later
+                }
+            } catch (Throwable t) {
+                try {
+                    upstreamSubscription.cancel();
+                } finally {
+                    signalError(t);
+                }
+            }
+        }
+
+        static LineSubscription create(Flow.Subscription s,
+                                       Charset charset,
+                                       String lineSeparator,
+                                       Flow.Subscriber<? super String> upstream,
+                                       CompletableFuture<?> cf) {
+            return new LineSubscription(Objects.requireNonNull(s),
+                    Objects.requireNonNull(charset).newDecoder()
+                            // use the same decoder configuration than
+                            // java.io.InputStreamReader
+                            .onMalformedInput(CodingErrorAction.REPLACE)
+                            .onUnmappableCharacter(CodingErrorAction.REPLACE),
+                    lineSeparator,
+                    Objects.requireNonNull(upstream),
+                    Objects.requireNonNull(cf));
+        }
+    }
+}
+
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,11 +25,15 @@
 
 package jdk.incubator.http;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
@@ -52,6 +56,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 
@@ -461,6 +466,58 @@
 
     }
 
+    /**
+     * A {@code Stream<String>} built on top of the Flow API.
+     */
+    static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> {
+
+        private final HttpResponseInputStream responseInputStream;
+        private final Charset charset;
+        private HttpLineStream(Charset charset) {
+            this.charset = Objects.requireNonNull(charset);
+            responseInputStream = new HttpResponseInputStream();
+        }
+
+        @Override
+        public CompletionStage<Stream<String>> getBody() {
+            return responseInputStream.getBody().thenApply((is) ->
+                    new BufferedReader(new InputStreamReader(is, charset))
+                            .lines().onClose(this::close));
+        }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            responseInputStream.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            responseInputStream.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            responseInputStream.onError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            responseInputStream.onComplete();
+        }
+
+        void close() {
+            try {
+                responseInputStream.close();
+            } catch (IOException x) {
+                // ignore
+            }
+        }
+
+        static HttpLineStream create(Charset charset) {
+            return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
+        }
+    }
+
     static class MultiSubscriberImpl<V>
         implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
     {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -28,6 +28,7 @@
 import jdk.incubator.http.HttpHeaders;
 import sun.net.NetProperties;
 import sun.net.util.IPAddressUtil;
+import sun.net.www.HeaderParser;
 
 import javax.net.ssl.SSLParameters;
 import java.io.ByteArrayOutputStream;
@@ -476,11 +477,17 @@
      * UTF_8
      */
     public static Charset charsetFrom(HttpHeaders headers) {
-        String encoding = headers.firstValue("Content-encoding")
-                .orElse("UTF_8");
+        String type = headers.firstValue("Content-type")
+                .orElse("text/html; charset=utf-8");
+        int i = type.indexOf(";");
+        if (i >= 0) type = type.substring(i+1);
         try {
-            return Charset.forName(encoding);
-        } catch (IllegalArgumentException e) {
+            HeaderParser parser = new HeaderParser(type);
+            String value = parser.findValue("charset");
+            if (value == null) return StandardCharsets.UTF_8;
+            return Charset.forName(value);
+        } catch (Throwable x) {
+            Log.logTrace("Can't find charset in \"{0}\" ({1})", type, x);
             return StandardCharsets.UTF_8;
         }
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineAdaptersCompileOnly.java	Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+
+/*
+ * @test
+ * @summary Basic test for Flow adapters with generic type parameters
+ * @compile LineAdaptersCompileOnly.java
+ */
+
+public class LineAdaptersCompileOnly {
+
+    public static void main(String[] args) {
+        makesSureDifferentGenericSignaturesCompile();
+    }
+
+    static void makesSureDifferentGenericSignaturesCompile() {
+
+        BodyHandler.fromLineSubscriber(new StringSubscriber());
+        BodyHandler.fromLineSubscriber(new CharSequenceSubscriber());
+        BodyHandler.fromLineSubscriber(new ObjectSubscriber());
+
+
+        BodySubscriber.fromLineSubscriber(new StringSubscriber());
+        BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber());
+        BodySubscriber.fromLineSubscriber(new ObjectSubscriber());
+
+
+        BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), "\n");
+        BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),  "\r\n");
+        BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), "\n");
+        BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null);
+        BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),  null);
+        BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), null);
+
+        BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                StandardCharsets.UTF_8, "\n");
+        BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+                StandardCharsets.UTF_16, "\r\n");
+        BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+                StandardCharsets.US_ASCII, "\n");
+        BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                StandardCharsets.UTF_8, null);
+        BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+                StandardCharsets.UTF_16, null);
+        BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+                StandardCharsets.US_ASCII, null);
+    }
+
+    static class StringSubscriber implements Flow.Subscriber<String> {
+        @Override public void onSubscribe(Flow.Subscription subscription) { }
+        @Override public void onNext(String item) { }
+        @Override public void onError(Throwable throwable) { }
+        @Override public void onComplete() { }
+    }
+
+    static class CharSequenceSubscriber implements Flow.Subscriber<CharSequence> {
+        @Override public void onSubscribe(Flow.Subscription subscription) { }
+        @Override public void onNext(CharSequence item) { }
+        @Override public void onError(Throwable throwable) { }
+        @Override public void onComplete() { }
+    }
+
+    static class ObjectSubscriber implements Flow.Subscriber<Object> {
+        @Override public void onSubscribe(Flow.Subscription subscription) { }
+        @Override public void onNext(Object item) { }
+        @Override public void onError(Throwable throwable) { }
+        @Override public void onComplete() { }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineBodyHandlerTest.java	Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,702 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.math.BigInteger;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import javax.net.ssl.SSLContext;
+
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+
+/*
+ * @test
+ * @summary Basic tests for line adapter subscribers as created by
+ *          the BodyHandlers returned by BodyHandler::fromLineSubscriber
+ *          and BodyHandler::asLines
+ * @modules java.base/sun.net.www.http
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ *          java.logging
+ *          jdk.httpserver
+ * @library /lib/testlibrary http2/server
+ * @build Http2TestServer
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @run testng/othervm LineBodyHandlerTest
+ */
+
+public class LineBodyHandlerTest {
+
+    SSLContext sslContext;
+    HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
+    HttpsServer httpsTestServer;       // HTTPS/1.1
+    Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
+    Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
+    String httpURI;
+    String httpsURI;
+    String http2URI;
+    String https2URI;
+
+    @DataProvider(name = "uris")
+    public Object[][] variants() {
+        return new Object[][]{
+                { httpURI   },
+                { httpsURI  },
+                { http2URI  },
+                { https2URI },
+        };
+    }
+
+    static final Class<NullPointerException> NPE = NullPointerException.class;
+    static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
+
+    @Test
+    public void testNull() {
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null));
+        assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber()));
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), "\n"));
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, "\n"));
+        assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null));
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, null, "\n"));
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), null));
+        assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, null));
+
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+                Charset.defaultCharset(), System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+                Charset.defaultCharset(), System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                null, System.lineSeparator()));
+        assertNotNull(BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                Charset.defaultCharset(), null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+                Charset.defaultCharset(), System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+                null, System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+                Charset.defaultCharset(), null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+                null, System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+                Charset.defaultCharset(), null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                null, null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, null, null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+                null, null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+                Charset.defaultCharset(), null));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+                null, System.lineSeparator()));
+        assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, null, null));
+    }
+
+    @Test
+    public void testIAE() {
+        assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(),""));
+        assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),""));
+        assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), ""));
+        assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+                    StandardCharsets.UTF_8, ""));
+        assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+                    StandardCharsets.UTF_16, ""));
+        assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+                    StandardCharsets.US_ASCII, ""));
+    }
+
+    private static final List<String> lines(String text, String eol) {
+        if (eol == null) {
+            return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+        } else {
+            String replaced = text.replace(eol, "|");
+            int i=0;
+            while(replaced.endsWith("||")) {
+                replaced = replaced.substring(0,replaced.length()-1);
+                i++;
+            }
+            List<String> res = List.of(replaced.split("\\|"));
+            if (i > 0) {
+                res = new ArrayList<>(res);
+                for (int j=0; j<i; j++) res.add("");
+            }
+            return res;
+        }
+    }
+
+    @Test(dataProvider = "uris")
+    void testStringWithFinisher(String url) {
+        String body = "May the luck of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        StringSubscriber subscriber = new StringSubscriber();
+        HttpResponse<String> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\n"))
+                .join();
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, body);
+        assertEquals(subscriber.list, lines(body, "\n"));
+    }
+
+    @Test(dataProvider = "uris")
+    void testAsStream(String url) {
+        String body = "May the luck of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, body);
+        assertEquals(list, List.of(body));
+        assertEquals(list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testStringWithFinisher2(String url) {
+        String body = "May the luck\r\n\r\n of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        StringSubscriber subscriber = new StringSubscriber();
+        HttpResponse<Void> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber)).join();
+        String text = subscriber.get();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, body.replace("\r\n", "\n"));
+        assertEquals(subscriber.list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testAsStreamWithCRLF(String url) {
+        String body = "May the luck\r\n\r\n of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, "May the luck|| of the Irish be with you!");
+        assertEquals(list, List.of("May the luck",
+                                   "",
+                                   " of the Irish be with you!"));
+        assertEquals(list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testStringWithFinisherBlocking(String url) throws Exception {
+        String body = "May the luck of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body)).build();
+
+        StringSubscriber subscriber = new StringSubscriber();
+        HttpResponse<String> response = client.send(request,
+                BodyHandler.fromLineSubscriber(subscriber, Supplier::get, "\n"));
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, "May the luck of the Irish be with you!");
+        assertEquals(subscriber.list, lines(body, "\n"));
+    }
+
+    @Test(dataProvider = "uris")
+    void testStringWithoutFinisherBlocking(String url) throws Exception {
+        String body = "May the luck of the Irish be with you!";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body)).build();
+
+        StringSubscriber subscriber = new StringSubscriber();
+        HttpResponse<Void> response = client.send(request,
+                BodyHandler.fromLineSubscriber(subscriber));
+        String text = subscriber.get();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, "May the luck of the Irish be with you!");
+        assertEquals(subscriber.list, lines(body, null));
+    }
+
+    // Subscriber<Object>
+
+    @Test(dataProvider = "uris")
+    void testAsStreamWithMixedCRLF(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May| the wind| always be|at your back.|");
+        assertEquals(list, List.of("May",
+                                   " the wind",
+                                   " always be",
+                                   "at your back.",
+                                   ""));
+        assertEquals(list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testAsStreamWithMixedCRLF_UTF8(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .header("Content-type", "text/text; charset=UTF-8")
+                .POST(fromString(body, UTF_8)).build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May| the wind| always be|at your back.|");
+        assertEquals(list, List.of("May",
+                                   " the wind",
+                                   " always be",
+                                   "at your back.", ""));
+        assertEquals(list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testAsStreamWithMixedCRLF_UTF16(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .header("Content-type", "text/text; charset=UTF-16")
+                .POST(fromString(body, UTF_16)).build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May| the wind| always be|at your back.|");
+        assertEquals(list, List.of("May",
+                                   " the wind",
+                                   " always be",
+                                   "at your back.",
+                                   ""));
+        assertEquals(list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testObjectWithFinisher(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        HttpResponse<String> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber, ObjectSubscriber::get, "\r\n"))
+                .join();
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May\n the wind\n always be\rat your back.");
+        assertEquals(subscriber.list, List.of("May",
+                                              " the wind",
+                                              " always be\rat your back."));
+        assertEquals(subscriber.list, lines(body, "\r\n"));
+    }
+
+    @Test(dataProvider = "uris")
+    void testObjectWithFinisher_UTF16(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .header("Content-type", "text/text; charset=UTF-16")
+                .POST(fromString(body, UTF_16)).build();
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        HttpResponse<String> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber,
+                                               ObjectSubscriber::get,
+                                   null)).join();
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May\n the wind\n always be\nat your back.\n");
+        assertEquals(subscriber.list, List.of("May",
+                                              " the wind",
+                                              " always be",
+                                              "at your back.",
+                                              ""));
+        assertEquals(subscriber.list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testObjectWithoutFinisher(String url) {
+        String body = "May\r\n the wind\r\n always be\rat your back.";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        HttpResponse<Void> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber)).join();
+        String text = subscriber.get();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May\n the wind\n always be\nat your back.");
+        assertEquals(subscriber.list, List.of("May",
+                                              " the wind",
+                                              " always be",
+                                              "at your back."));
+        assertEquals(subscriber.list, lines(body, null));
+    }
+
+    @Test(dataProvider = "uris")
+    void testObjectWithFinisherBlocking(String url) throws Exception {
+        String body = "May\r\n the wind\r\n always be\nat your back.";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        HttpResponse<String> response = client.send(request,
+                BodyHandler.fromLineSubscriber(subscriber,
+                                               ObjectSubscriber::get,
+                                   "\r\n"));
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May\n the wind\n always be\nat your back.");
+        assertEquals(subscriber.list, List.of("May",
+                                              " the wind",
+                                              " always be\nat your back."));
+        assertEquals(subscriber.list, lines(body, "\r\n"));
+    }
+
+    @Test(dataProvider = "uris")
+    void testObjectWithoutFinisherBlocking(String url) throws Exception {
+        String body = "May\r\n the wind\r\n always be\nat your back.";
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(body))
+                .build();
+
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        HttpResponse<Void> response = client.send(request,
+                BodyHandler.fromLineSubscriber(subscriber));
+        String text = subscriber.get();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertTrue(text.length() != 0);  // what else can be asserted!
+        assertEquals(text, "May\n the wind\n always be\nat your back.");
+        assertEquals(subscriber.list, List.of("May",
+                                              " the wind",
+                                              " always be",
+                                              "at your back."));
+        assertEquals(subscriber.list, lines(body, null));
+    }
+
+    static private final String LINE = "Bient\u00f4t nous plongerons dans les" +
+            " fr\u00f4\ud801\udc00des t\u00e9n\u00e8bres, ";
+
+    static private final String bigtext() {
+        StringBuilder res = new StringBuilder((LINE.length() + 1) * 50);
+        for (int i = 0; i<50; i++) {
+            res.append(LINE);
+            if (i%2 == 0) res.append("\r\n");
+        }
+        return res.toString();
+    }
+
+    @Test(dataProvider = "uris")
+    void testBigTextFromLineSubscriber(String url) {
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        String bigtext = bigtext();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(bigtext))
+                .build();
+
+        StringSubscriber subscriber = new StringSubscriber();
+        HttpResponse<String> response = client.sendAsync(request,
+                BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\r\n"))
+                .join();
+        String text = response.body();
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, bigtext.replace("\r\n", "\n"));
+        assertEquals(subscriber.list, lines(bigtext, "\r\n"));
+    }
+
+    @Test(dataProvider = "uris")
+    void testBigTextAsStream(String url) {
+        HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+                .build();
+        String bigtext = bigtext();
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+                .POST(fromString(bigtext))
+                .build();
+
+        HttpResponse<Stream<String>> response = client.sendAsync(request,
+                BodyHandler.asLines()).join();
+        Stream<String> stream = response.body();
+        List<String> list = stream.collect(Collectors.toList());
+        String text = list.stream().collect(Collectors.joining("|"));
+        System.out.println(text);
+        assertEquals(response.statusCode(), 200);
+        assertEquals(text, bigtext.replace("\r\n", "|"));
+        assertEquals(list, List.of(bigtext.split("\r\n")));
+        assertEquals(list, lines(bigtext, null));
+    }
+
+    /** An abstract Subscriber that converts all received data into a String. */
+    static abstract class AbstractSubscriber implements Supplier<String> {
+        protected volatile Flow.Subscription subscription;
+        protected final StringBuilder baos = new StringBuilder();
+        protected volatile String text;
+        protected volatile RuntimeException error;
+        protected final List<Object> list = new CopyOnWriteArrayList<>();
+
+        public void onSubscribe(Flow.Subscription subscription) {
+            this.subscription = subscription;
+            subscription.request(Long.MAX_VALUE);
+        }
+        public void onError(Throwable throwable) {
+            System.out.println(this + " onError: " + throwable);
+            error = new RuntimeException(throwable);
+        }
+        public void onComplete() {
+            System.out.println(this + " onComplete");
+            text = baos.toString();
+        }
+        @Override public String get() {
+            if (error != null) throw error;
+            return text;
+        }
+    }
+
+    static class StringSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<String>, Supplier<String>
+    {
+        @Override public void onNext(String item) {
+            System.out.print(this + " onNext: \"" + item + "\"");
+            if (baos.length() != 0) baos.append('\n');
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+    static class CharSequenceSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<CharSequence>, Supplier<String>
+    {
+        @Override public void onNext(CharSequence item) {
+            System.out.print(this + " onNext: " + item);
+            if (baos.length() != 0) baos.append('\n');
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+    static class ObjectSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<Object>, Supplier<String>
+    {
+        @Override public void onNext(Object item) {
+            System.out.print(this + " onNext: " + item);
+            if (baos.length() != 0) baos.append('\n');
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+
+    static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
+        try {
+            baos.write(ba);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        InetSocketAddress sa = new InetSocketAddress("localhost", 0);
+        httpTestServer = HttpServer.create(sa, 0);
+        httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
+        httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
+
+        httpsTestServer = HttpsServer.create(sa, 0);
+        httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+        httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
+        httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
+
+        http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+        http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
+        int port = http2TestServer.getAddress().getPort();
+        http2URI = "http://127.0.0.1:" + port + "/http2/echo";
+
+        https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+        https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
+        port = https2TestServer.getAddress().getPort();
+        https2URI = "https://127.0.0.1:" + port + "/https2/echo";
+
+        httpTestServer.start();
+        httpsTestServer.start();
+        http2TestServer.start();
+        https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        httpTestServer.stop(0);
+        httpsTestServer.stop(0);
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static void printBytes(PrintStream out, String prefix, byte[] bytes) {
+        int padding = 4 + 4 - (bytes.length % 4);
+        padding = padding > 4 ? padding - 4 : 4;
+        byte[] bigbytes = new byte[bytes.length + padding];
+        System.arraycopy(bytes, 0, bigbytes, padding, bytes.length);
+        out.println(prefix + bytes.length + " "
+                    + new BigInteger(bigbytes).toString(16));
+    }
+
+    static class Http1EchoHandler implements HttpHandler {
+        @Override
+        public void handle(HttpExchange t) throws IOException {
+            try (InputStream is = t.getRequestBody();
+                 OutputStream os = t.getResponseBody()) {
+                byte[] bytes = is.readAllBytes();
+                printBytes(System.out,"Bytes: ", bytes);
+                if (t.getRequestHeaders().containsKey("Content-type")) {
+                    t.getResponseHeaders().add("Content-type",
+                            t.getRequestHeaders().getFirst("Content-type"));
+                }
+                t.sendResponseHeaders(200, bytes.length);
+                os.write(bytes);
+            }
+        }
+    }
+
+    static class Http2EchoHandler implements Http2Handler {
+        @Override
+        public void handle(Http2TestExchange t) throws IOException {
+            try (InputStream is = t.getRequestBody();
+                 OutputStream os = t.getResponseBody()) {
+                byte[] bytes = is.readAllBytes();
+                printBytes(System.out,"Bytes: ", bytes);
+                if (t.getRequestHeaders().firstValue("Content-type").isPresent()) {
+                    t.getResponseHeaders().addHeader("Content-type",
+                            t.getRequestHeaders().firstValue("Content-type").get());
+                }
+                t.sendResponseHeaders(200, bytes.length);
+                os.write(bytes);
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineStreamsAndSurrogatesTest.java	Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,319 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.MalformedInputException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.SubmissionPublisher;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @summary tests for BodySubscribers returned by asLines.
+ *       In particular tests that surrogate characters are handled
+ *       correctly.
+ * @modules jdk.incubator.httpclient java.logging
+ * @run testng/othervm LineStreamsAndSurrogatesTest
+ */
+
+public class LineStreamsAndSurrogatesTest {
+
+
+    static final Class<NullPointerException> NPE = NullPointerException.class;
+
+    private static final List<String> lines(String text) {
+        return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+    }
+
+    @Test
+    void testUncomplete() throws Exception {
+        // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r les\n\n" +
+                " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00";
+        Charset charset = UTF_8;
+
+        BodySubscriber<Stream<String>> bodySubscriber =
+                BodySubscriber.asLines(charset);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        Runnable run = () -> {
+            try {
+                SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+                byte[] sbytes = text.getBytes(charset);
+                byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
+                publisher.subscribe(bodySubscriber);
+                System.out.println("Publishing " + bytes.length + " bytes");
+                for (int i = 0; i < bytes.length; i++) {
+                    // ensure that surrogates are split over several buffers.
+                    publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+                }
+                publisher.close();
+            } catch(Throwable t) {
+                errorRef.set(t);
+            }
+        };
+        Thread thread = new Thread(run,"Publishing");
+        thread.start();
+        try {
+            Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+            List<String> list = stream.collect(Collectors.toList());
+            String resp = list.stream().collect(Collectors.joining(""));
+            System.out.println("***** Got: " + resp);
+
+            byte[] sbytes = text.getBytes(UTF_8);
+            byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(bais, charset));
+            String resp2 = reader.lines().collect(Collectors.joining(""));
+            System.out.println("***** Got2: " + resp2);
+
+            assertEquals(resp, resp2);
+            assertEquals(list, List.of("Bient\u00f4t",
+                                       " nous plongerons",
+                                       " dans",
+                                       " les",
+                                       "",
+                                       " fr\u00f4\ud801\udc00des",
+                                       " t\u00e9n\u00e8bres\ufffd"));
+        } catch (ExecutionException x) {
+            Throwable cause = x.getCause();
+            if (cause instanceof MalformedInputException) {
+                throw new RuntimeException("Unexpected MalformedInputException", cause);
+            }
+            throw x;
+        }
+        if (errorRef.get() != null) {
+            throw new RuntimeException("Unexpected exception", errorRef.get());
+        }
+    }
+
+    @Test
+    void testStream1() throws Exception {
+        // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r les\n\n" +
+                " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+        Charset charset = UTF_8;
+
+        BodySubscriber<Stream<String>> bodySubscriber =
+                BodySubscriber.asLines(charset);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(charset);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        Runnable run = () -> {
+            try {
+                publisher.subscribe(bodySubscriber);
+                System.out.println("Publishing " + bytes.length + " bytes");
+                for (int i = 0; i < bytes.length; i++) {
+                    // ensure that surrogates are split over several buffers.
+                    publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+                }
+                publisher.close();
+            } catch(Throwable t) {
+                errorRef.set(t);
+            }
+        };
+
+        Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+        Thread thread = new Thread(run,"Publishing");
+        thread.start();
+        List<String> list = stream.collect(Collectors.toList());
+        String resp = list.stream().collect(Collectors.joining("|"));
+        System.out.println("***** Got: " + resp);
+        assertEquals(resp, text.replace("\r\n", "|")
+                               .replace("\n","|")
+                               .replace("\r","|"));
+        assertEquals(list, List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                "",
+                " les",
+                "",
+                " fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres"));
+        assertEquals(list, lines(text));
+        if (errorRef.get() != null) {
+            throw new RuntimeException("Unexpected exception", errorRef.get());
+        }
+    }
+
+
+    @Test
+    void testStream2() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+                " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+        Charset charset = UTF_8;
+
+        BodySubscriber<Stream<String>> bodySubscriber = BodySubscriber.asLines(charset);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(charset);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        Runnable run = () -> {
+            try {
+                publisher.subscribe(bodySubscriber);
+                System.out.println("Publishing " + bytes.length + " bytes");
+                for (int i = 0; i < bytes.length; i++) {
+                    // ensure that surrogates are split over several buffers.
+                    publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+                }
+                publisher.close();
+            } catch(Throwable t) {
+                errorRef.set(t);
+            }
+        };
+
+        Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+        Thread thread = new Thread(run,"Publishing");
+        thread.start();
+        List<String> list = stream.collect(Collectors.toList());
+        String resp = list.stream().collect(Collectors.joining(""));
+        System.out.println("***** Got: " + resp);
+        String expected = Stream.of(text.split("\r\n|\r|\n"))
+                .collect(Collectors.joining(""));
+        assertEquals(resp, expected);
+        assertEquals(list, List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                "",
+                " les fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres",
+                ""));
+        assertEquals(list, lines(text));
+        if (errorRef.get() != null) {
+            throw new RuntimeException("Unexpected exception", errorRef.get());
+        }
+    }
+
+    @Test
+    void testStream3_UTF16() throws Exception {
+        // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+                " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+        Charset charset = UTF_16;
+
+        BodySubscriber<Stream<String>> bodySubscriber =
+                BodySubscriber.asLines(charset);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(charset);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        Runnable run = () -> {
+            try {
+                publisher.subscribe(bodySubscriber);
+                System.out.println("Publishing " + bytes.length + " bytes");
+                for (int i = 0; i < bytes.length; i++) {
+                    // ensure that surrogates are split over several buffers.
+                    publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+                }
+                publisher.close();
+            } catch(Throwable t) {
+                errorRef.set(t);
+            }
+        };
+
+        Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+        Thread thread = new Thread(run,"Publishing");
+        thread.start();
+        List<String> list = stream.collect(Collectors.toList());
+        String resp = list.stream().collect(Collectors.joining(""));
+        System.out.println("***** Got: " + resp);
+        assertEquals(resp, text.replace("\n","").replace("\r",""));
+        assertEquals(list, List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                "",
+                " les",
+                "",
+                " fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres"));
+        assertEquals(list, lines(text));
+        if (errorRef.get() != null) {
+            throw new RuntimeException("Unexpected exception", errorRef.get());
+        }
+    }
+
+
+    @Test
+    void testStream4_UTF16() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+                " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+        Charset charset = UTF_16;
+
+        BodySubscriber<Stream<String>> bodySubscriber =
+                BodySubscriber.asLines(charset);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(charset);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        Runnable run = () -> {
+            try {
+                publisher.subscribe(bodySubscriber);
+                System.out.println("Publishing " + bytes.length + " bytes");
+                for (int i = 0; i < bytes.length; i++) {
+                    // ensure that surrogates are split over several buffers.
+                    publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+                }
+                publisher.close();
+            } catch(Throwable t) {
+                errorRef.set(t);
+            }
+        };
+
+        Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+        Thread thread = new Thread(run,"Publishing");
+        thread.start();
+        List<String> list = stream.collect(Collectors.toList());
+        String resp = list.stream().collect(Collectors.joining(""));
+        System.out.println("***** Got: " + resp);
+        String expected = Stream.of(text.split("\r\n|\r|\n"))
+                .collect(Collectors.joining(""));
+        assertEquals(resp, expected);
+        assertEquals(list, List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                "",
+                " les fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres",
+                ""));
+        assertEquals(list, lines(text));
+        if (errorRef.get() != null) {
+            throw new RuntimeException("Unexpected exception", errorRef.get());
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineSubscribersAndSurrogatesTest.java	Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,384 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.SubmissionPublisher;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @summary tests for BodySubscribers returned by fromLineSubscriber.
+ *       In particular tests that surrogate characters are handled
+ *       correctly.
+ * @modules jdk.incubator.httpclient java.logging
+ * @run testng/othervm LineSubscribersAndSurrogatesTest
+ */
+
+public class LineSubscribersAndSurrogatesTest {
+
+
+    static final Class<NullPointerException> NPE = NullPointerException.class;
+
+    private static final List<String> lines(String text, String eol) {
+        if (eol == null) {
+            return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+        } else {
+            String replaced = text.replace(eol, "|");
+            int i=0;
+            while(replaced.endsWith("||")) {
+                replaced = replaced.substring(0,replaced.length()-1);
+                i++;
+            }
+            List<String> res = List.of(replaced.split("\\|"));
+            if (i > 0) {
+                res = new ArrayList<>(res);
+                for (int j=0; j<i; j++) res.add("");
+            }
+            return res;
+        }
+    }
+
+    @Test
+    void testIncomplete() throws Exception {
+        // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_8, null);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] sbytes = text.getBytes(UTF_8);
+        byte[] bytes = Arrays.copyOfRange(sbytes,0, sbytes.length - 1);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        try {
+            String resp = bodySubscriber.getBody().toCompletableFuture().get();
+            System.out.println("***** Got: " + resp);
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(bais, UTF_8));
+            String resp2 = reader.lines().collect(Collectors.joining(""));
+            assertEquals(resp, resp2);
+            assertEquals(subscriber.list, List.of("Bient\u00f4t",
+                    " nous plongerons",
+                    " dans",
+                    " les",
+                    "",
+                    " fr\u00f4\ud801\udc00des",
+                    " t\u00e9n\u00e8bres\ufffd"));
+        } catch (ExecutionException x) {
+            Throwable cause = x.getCause();
+            if (cause instanceof MalformedInputException) {
+                throw new RuntimeException("Unexpected MalformedInputException thrown", cause);
+            }
+            throw x;
+        }
+    }
+
+
+    @Test
+    void testStringWithFinisherLF() throws Exception {
+        // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_8, "\n");
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_8);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        String resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        List<String> expected = List.of("Bient\u00f4t\r",
+                " nous plongerons\r",
+                " dans\r les",
+                "",
+                " fr\u00f4\ud801\udc00des\r",
+                " t\u00e9n\u00e8bres\r");
+        assertEquals(subscriber.list, expected);
+        assertEquals(resp, Stream.of(text.split("\n")).collect(Collectors.joining("")));
+        assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+        assertEquals(subscriber.list, lines(text, "\n"));
+    }
+
+
+    @Test
+    void testStringWithFinisherCR() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_8, "\r");
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_8);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        String resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        assertEquals(resp, text.replace("\r", ""));
+        assertEquals(subscriber.list, List.of("Bient\u00f4t",
+                "\n nous plongerons",
+                "\n dans",
+                " les fr\u00f4\ud801\udc00des",
+                "\n t\u00e9n\u00e8bres",
+                ""));
+        assertEquals(subscriber.list, lines(text, "\r"));
+    }
+
+    @Test
+    void testStringWithFinisherCRLF() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_8, "\r\n");
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_8);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        String resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        assertEquals(resp, text.replace("\r\n",""));
+        assertEquals(subscriber.list, List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans\r les fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres"));
+        assertEquals(subscriber.list, lines(text, "\r\n"));
+    }
+
+
+    @Test
+    void testStringWithFinisherBR() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_8, null);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_8);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        String resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        List<String> expected = List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                " les",
+                "",
+                " fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres");
+        assertEquals(subscriber.list, expected);
+        assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+        assertEquals(subscriber.list, lines(text, null));
+    }
+
+    @Test
+    void testStringWithFinisherBR_UTF_16() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+                subscriber, Supplier::get, UTF_16, null);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_16);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i=0; i<bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        String resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        List<String> expected = List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                " les",
+                "",
+                " fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres",
+                "");
+        assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+        assertEquals(subscriber.list, expected);
+        assertEquals(subscriber.list, lines(text, null));
+    }
+
+    void testStringWithoutFinisherBR() throws Exception {
+        String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+                " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+        ObjectSubscriber subscriber = new ObjectSubscriber();
+        BodySubscriber<Void> bodySubscriber = BodySubscriber.fromLineSubscriber(subscriber);
+        SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+        byte[] bytes = text.getBytes(UTF_8);
+        publisher.subscribe(bodySubscriber);
+        System.out.println("Publishing " + bytes.length + " bytes");
+        for (int i = 0; i < bytes.length; i++) {
+            // ensure that surrogates are split over several buffers.
+            publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+        }
+        publisher.close();
+        Void resp = bodySubscriber.getBody().toCompletableFuture().get();
+        System.out.println("***** Got: " + resp);
+        List<String> expected = List.of("Bient\u00f4t",
+                " nous plongerons",
+                " dans",
+                " les",
+                "",
+                " fr\u00f4\ud801\udc00des",
+                " t\u00e9n\u00e8bres");
+        assertEquals(subscriber.text, expected.stream().collect(Collectors.joining("")));
+        assertEquals(subscriber.list, expected);
+        assertEquals(subscriber.list, lines(text, null));
+    }
+
+
+    /** An abstract Subscriber that converts all received data into a String. */
+    static abstract class AbstractSubscriber implements Supplier<String> {
+        protected final List<Object> list = new CopyOnWriteArrayList<>();
+        protected volatile Flow.Subscription subscription;
+        protected final StringBuilder baos = new StringBuilder();
+        protected volatile String text;
+        protected volatile RuntimeException error;
+
+        public void onSubscribe(Flow.Subscription subscription) {
+            this.subscription = subscription;
+            subscription.request(Long.MAX_VALUE);
+        }
+        public void onError(Throwable throwable) {
+            System.out.println(this + " onError: " + throwable);
+            error = new RuntimeException(throwable);
+        }
+        public void onComplete() {
+            System.out.println(this + " onComplete");
+            text = baos.toString();
+        }
+        @Override public String get() {
+            if (error != null) throw error;
+            return text;
+        }
+        public final List<?> list() {
+            return list;
+        }
+    }
+
+    static class StringSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<String>, Supplier<String>
+    {
+        @Override public void onNext(String item) {
+            System.out.println(this + " onNext: \""
+                    + item.replace("\n","\\n")
+                          .replace("\r", "\\r")
+                    + "\"");
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+    static class CharSequenceSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<CharSequence>, Supplier<String>
+    {
+        @Override public void onNext(CharSequence item) {
+            System.out.println(this + " onNext: \""
+                    + item.toString().replace("\n","\\n")
+                    .replace("\r", "\\r")
+                    + "\"");
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+    static class ObjectSubscriber extends AbstractSubscriber
+            implements Flow.Subscriber<Object>, Supplier<String>
+    {
+        @Override public void onNext(Object item) {
+            System.out.println(this + " onNext: \""
+                    + item.toString().replace("\n","\\n")
+                    .replace("\r", "\\r")
+                    + "\"");
+            baos.append(item);
+            list.add(item);
+        }
+    }
+
+
+    static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
+        try {
+            baos.write(ba);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+}
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -122,7 +122,7 @@
         if (c == -1) {
             return -1;
         }
-        return one[0];
+        return one[0] & 0xFF;
     }
 
     @Override
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -269,9 +269,9 @@
         }
     }
 
-    String doUpgrade() throws IOException {
-        String upgrade = readHttp1Request();
-        String h2c = getHeader(upgrade, "Upgrade");
+    Http1InitialRequest doUpgrade() throws IOException {
+        Http1InitialRequest upgrade = readHttp1Request();
+        String h2c = getHeader(upgrade.headers, "Upgrade");
         if (h2c == null || !h2c.equals("h2c")) {
             System.err.println("Server:HEADERS: " + upgrade);
             throw new IOException("Bad upgrade 1 " + h2c);
@@ -283,7 +283,7 @@
         sendSettingsFrame();
         readPreface();
 
-        String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
+        String clientSettingsString = getHeader(upgrade.headers, "HTTP2-Settings");
         clientSettings = getSettingsFromString(clientSettingsString);
 
         return upgrade;
@@ -313,7 +313,7 @@
     }
 
     void run() throws Exception {
-        String upgrade = null;
+        Http1InitialRequest upgrade = null;
         if (!secure) {
             upgrade = doUpgrade();
         } else {
@@ -462,9 +462,9 @@
 
     // First stream (1) comes from a plaintext HTTP/1.1 request
     @SuppressWarnings({"rawtypes","unchecked"})
-    void createPrimordialStream(String request) throws IOException {
+    void createPrimordialStream(Http1InitialRequest request) throws IOException {
         HttpHeadersImpl headers = new HttpHeadersImpl();
-        String requestLine = getRequestLine(request);
+        String requestLine = getRequestLine(request.headers);
         String[] tokens = requestLine.split(" ");
         if (!tokens[2].equals("HTTP/1.1")) {
             throw new IOException("bad request line");
@@ -475,7 +475,7 @@
         } catch (URISyntaxException e) {
             throw new IOException(e);
         }
-        String host = getHeader(request, "Host");
+        String host = getHeader(request.headers, "Host");
         if (host == null) {
             throw new IOException("missing Host");
         }
@@ -485,9 +485,9 @@
         headers.setHeader(":authority", host);
         headers.setHeader(":path", uri.getPath());
         Queue q = new Queue(sentinel);
-        String body = getRequestBody(request);
-        addHeaders(getHeaders(request), headers);
-        headers.setHeader("Content-length", Integer.toString(body.length()));
+        byte[] body = getRequestBody(request);
+        addHeaders(getHeaders(request.headers), headers);
+        headers.setHeader("Content-length", Integer.toString(body.length));
 
         addRequestBodyToQueue(body, q);
         streams.put(1, q);
@@ -885,7 +885,16 @@
     final static String CRLF = "\r\n";
     final static String CRLFCRLF = "\r\n\r\n";
 
-    String readHttp1Request() throws IOException {
+    static class Http1InitialRequest {
+        final String headers;
+        final byte[] body;
+        Http1InitialRequest(String headers, byte[] body) {
+            this.headers = headers;
+            this.body = body.clone();
+        }
+    }
+
+    Http1InitialRequest readHttp1Request() throws IOException {
         String headers = readUntil(CRLF + CRLF);
         int clen = getContentLength(headers);
         String te = getHeader(headers, "Transfer-encoding");
@@ -899,8 +908,7 @@
                 //  HTTP/1.1 chunked data, read it
                 buf = readChunkedInputStream(is);
             }
-            String body = new String(buf, StandardCharsets.US_ASCII);
-            return headers + body;
+            return new Http1InitialRequest(headers, buf);
         } catch (IOException e) {
             System.err.println("TestServer: headers read: [ " + headers + " ]");
             throw e;
@@ -943,19 +951,13 @@
     // wrapper around a BlockingQueue that throws an exception when it's closed
     // Each stream has one of these
 
-    String getRequestBody(String request) {
-        int bodystart = request.indexOf(CRLF+CRLF);
-        String body;
-        if (bodystart == -1)
-            body = "";
-        else
-            body = request.substring(bodystart+4);
-        return body;
+    byte[] getRequestBody(Http1InitialRequest request) {
+        return request.body;
     }
 
     @SuppressWarnings({"rawtypes","unchecked"})
-    void addRequestBodyToQueue(String body, Queue q) throws IOException {
-        ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
+    void addRequestBodyToQueue(byte[] body, Queue q) throws IOException {
+        ByteBuffer buf = ByteBuffer.wrap(body);
         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
         // only used for primordial stream
         q.put(df);