http-client-branch: Adds some convenience methods to parse body responses as lines
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,7 @@
package jdk.incubator.http;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -34,12 +35,12 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.security.AccessControlContext;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -49,6 +50,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Stream;
import javax.net.ssl.SSLParameters;
/**
@@ -399,6 +401,91 @@
}
/**
+ * Returns a response body handler that returns a {@link BodySubscriber
+ * BodySubscriber}{@code <Void>} obtained from {@link
+ * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String)
+ * BodySubscriber.fromLineSubscriber(subscriber, s -> null, charset, null)},
+ * with the given {@code subscriber}.
+ * The {@link Charset charset} used to decode the response body bytes is
+ * obtained from the HTTP response headers as specified by {@link #asString()},
+ * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
+ *
+ * <p> The response body is not available through this, or the {@code
+ * HttpResponse} API, but instead all response body is forwarded to the
+ * given {@code subscriber}, which should make it available, if
+ * appropriate, through some other mechanism, e.g. an entry in a
+ * database, etc.
+ *
+ * @apiNote This method can be used as an adapter between {@code
+ * BodySubscriber} and {@code Flow.Subscriber}.
+ *
+ * <p> For example:
+ * <pre> {@code
+ * TextSubscriber subscriber = new TextSubscriber();
+ * HttpResponse<Void> response = client.sendAsync(request,
+ * BodyHandler.fromLineSubscriber(subscriber, "\n")).join();
+ * System.out.println(response.statusCode());
+ * }</pre>
+ *
+ * @param subscriber the subscriber
+ * @return a response body handler
+ */
+ public static BodyHandler<Void>
+ fromLineSubscriber(Subscriber<? super String> subscriber) {
+ Objects.requireNonNull(subscriber);
+ return (status, headers)
+ -> BodySubscriber.fromLineSubscriber(subscriber, s -> null,
+ charsetFrom(headers), null);
+ }
+
+ /**
+ * Returns a response body handler that returns a {@link BodySubscriber
+ * BodySubscriber}{@code <T>} obtained from {@link
+ * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String)
+ * BodySubscriber.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)},
+ * with the given {@code subscriber}, {@code finisher} function, and line separator.
+ * The {@link Charset charset} used to decode the response body bytes is
+ * obtained from the HTTP response headers as specified by {@link #asString()}.
+ *
+ * <p> The given {@code finisher} function is applied after the given
+ * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+ * function is invoked with the given subscriber, and returns a value
+ * that is set as the response's body.
+ *
+ * @apiNote This method can be used as an adapter between {@code
+ * BodySubscriber} and {@code Flow.Subscriber}.
+ *
+ * <p> For example:
+ * <pre> {@code
+ * TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String
+ * HttpResponse<String> response = client.sendAsync(request,
+ * BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult, "\n")).join();
+ * String text = response.body();
+ * }</pre>
+ *
+ * @param <S> the type of the Subscriber
+ * @param <T> the type of the response body
+ * @param subscriber the subscriber
+ * @param finisher a function to be applied after the subscriber has completed
+ * @param lineSeparator an optional line separator: can be {@code null},
+ * in which case lines will be delimited in the manner of
+ * {@link BufferedReader#readLine()}.
+ * @return a response body handler
+ * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string.
+ */
+ public static <S extends Subscriber<? super String>,T> BodyHandler<T>
+ fromLineSubscriber(S subscriber, Function<S,T> finisher, String lineSeparator) {
+ Objects.requireNonNull(subscriber);
+ Objects.requireNonNull(finisher);
+ // implicit null check
+ if (lineSeparator != null && lineSeparator.isEmpty())
+ throw new IllegalArgumentException("empty line separator");
+ return (status, headers) ->
+ BodySubscriber.fromLineSubscriber(subscriber, finisher,
+ charsetFrom(headers), lineSeparator);
+ }
+
+ /**
* Returns a response body handler which discards the response body and
* uses the given value as a replacement for it.
*
@@ -542,6 +629,25 @@
}
/**
+ * Returns a {@code BodyHandler<Stream<String>>} that returns a
+ * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained from
+ * {@link BodySubscriber#asLines(Charset)}
+ * BodySubscriber.asLines(charset)}.
+ * The {@link Charset charset} used to decode the response body bytes is
+ * obtained from the HTTP response headers as specified by {@link #asString()},
+ * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
+ *
+ * <p> When the {@code HttpResponse} object is returned, the body may
+ * not have been completely received.
+ *
+ * @return a response body handler
+ */
+ public static BodyHandler<Stream<String>> asLines() {
+ return (status, headers) ->
+ BodySubscriber.asLines(charsetFrom(headers));
+ }
+
+ /**
* Returns a {@code BodyHandler<Void>} that returns a
* {@link BodySubscriber BodySubscriber}{@code <Void>} obtained from
* {@link BodySubscriber#asByteArrayConsumer(Consumer)
@@ -578,7 +684,7 @@
* {@link BodySubscriber#asString(java.nio.charset.Charset)
* BodySubscriber.asString(Charset)}. The body is
* decoded using the character set specified in
- * the {@code Content-encoding} response header. If there is no such
+ * the {@code Content-type} response header. If there is no such
* header, or the character set is not supported, then
* {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used.
*
@@ -712,6 +818,71 @@
}
/**
+ * Returns a body subscriber that forwards all response body to the
+ * given {@code Flow.Subscriber}, lines by lines.
+ * The {@linkplain #getBody()} completion
+ * stage} of the returned body subscriber completes after one of the
+ * given subscribers {@code onComplete} or {@code onError} has been
+ * invoked.
+ * Bytes are decoded using the {@linkplain StandardCharsets#UTF_8
+ * UTF-8} charset, and lines are delimited in the manner of
+ * {@link BufferedReader#readLine()}.
+ *
+ * @apiNote This method can be used as an adapter between {@code
+ * BodySubscriber} and {@code Flow.Subscriber}.
+ *
+ * @implNote This is equivalent to calling <pre>{@code
+ * fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null)
+ * }</pre>
+ *
+ * @param <S> the type of the Subscriber
+ * @param subscriber the subscriber
+ * @return a body subscriber
+ */
+ public static <S extends Subscriber<? super String>> BodySubscriber<Void>
+ fromLineSubscriber(S subscriber) {
+ return fromLineSubscriber(subscriber, s -> null,
+ StandardCharsets.UTF_8, null);
+ }
+
+ /**
+ * Returns a body subscriber that forwards all response body to the
+ * given {@code Flow.Subscriber}, lines by lines.
+ * The {@linkplain #getBody()} completion
+ * stage} of the returned body subscriber completes after one of the
+ * given subscribers {@code onComplete} or {@code onError} has been
+ * invoked.
+ *
+ * <p> The given {@code finisher} function is applied after the given
+ * subscriber's {@code onComplete} has been invoked. The {@code finisher}
+ * function is invoked with the given subscriber, and returns a value
+ * that is set as the response's body.
+ *
+ * @apiNote This method can be used as an adapter between {@code
+ * BodySubscriber} and {@code Flow.Subscriber}.
+ *
+ * @param <S> the type of the Subscriber
+ * @param <T> the type of the response body
+ * @param subscriber the subscriber
+ * @param finisher a function to be applied after the subscriber has
+ * completed
+ * @param charset a {@link Charset} to decode the bytes
+ * @param lineSeparator an optional line separator: can be {@code null},
+ * in which case lines will be delimited in the manner of
+ * {@link BufferedReader#readLine()}.
+ * @return a body subscriber
+ * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string.
+ */
+ public static <S extends Subscriber<? super String>,T> BodySubscriber<T>
+ fromLineSubscriber(S subscriber,
+ Function<S,T> finisher,
+ Charset charset,
+ String lineSeparator) {
+ return LineSubscriberAdapter.create(subscriber,
+ finisher, charset, lineSeparator);
+ }
+
+ /**
* Returns a body subscriber which stores the response body as a {@code
* String} converted using the given {@code Charset}.
*
@@ -847,6 +1018,33 @@
}
/**
+ * Returns a {@code BodySubscriber} which streams the response body as
+ * a {@link Stream Stream<String>}, where each string in the stream
+ * corresponds to a line as defined by {@link BufferedReader#lines()}.
+ *
+ * <p> The {@link HttpResponse} using this subscriber is available
+ * immediately after the response headers have been read, without
+ * requiring to wait for the entire body to be processed. The response
+ * body can then be read directly from the {@link Stream}.
+ *
+ * @apiNote To ensure that all resources associated with the
+ * corresponding exchange are properly released the caller must
+ * ensure to either read all lines until the stream is exhausted,
+ * or call {@link Stream#close} if it is unable or unwilling to do so.
+ * Calling {@code close} before exhausting the stream may cause
+ * the underlying HTTP connection to be closed and prevent it
+ * from being reused for subsequent operations.
+ *
+ * @return a body subscriber that streams the response body as a
+ * {@link Stream Stream<String>}.
+ *
+ * @see BufferedReader#lines()
+ */
+ public static BodySubscriber<Stream<String>> asLines(Charset charset) {
+ return ResponseSubscribers.HttpLineStream.create(charset);
+ }
+
+ /**
* Returns a response subscriber which discards the response body. The
* supplied value is the value that will be returned from
* {@link HttpResponse#body()}.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,467 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
+final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
+ implements HttpResponse.BodySubscriber<R> {
+ private final CompletableFuture<R> cf = new MinimalFuture<>();
+ private final S subscriber;
+ private final Function<S, R> finisher;
+ private final Charset charset;
+ private final String eol;
+ private volatile LineSubscription downstream;
+
+ private LineSubscriberAdapter(S subscriber,
+ Function<S, R> finisher,
+ Charset charset,
+ String eol) {
+ if (eol != null && eol.isEmpty())
+ throw new IllegalArgumentException("empty line separator");
+ this.subscriber = Objects.requireNonNull(subscriber);
+ this.finisher = Objects.requireNonNull(finisher);
+ this.charset = Objects.requireNonNull(charset);
+ this.eol = eol;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ downstream = LineSubscription.create(subscription,
+ charset,
+ eol,
+ subscriber,
+ cf);
+ subscriber.onSubscribe(downstream);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ try {
+ downstream.submit(item);
+ } catch (Throwable t) {
+ onError(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ downstream.signalError(throwable);
+ } finally {
+ cf.completeExceptionally(throwable);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ try {
+ downstream.signalComplete();
+ } finally {
+ cf.complete(finisher.apply(subscriber));
+ }
+ }
+
+ @Override
+ public CompletionStage<R> getBody() {
+ return cf;
+ }
+
+ static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
+ create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
+ {
+ if (eol != null && eol.isEmpty())
+ throw new IllegalArgumentException("empty line separator");
+ return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
+ Objects.requireNonNull(finisher),
+ Objects.requireNonNull(charset),
+ eol);
+ }
+
+ static final class LineSubscription implements Flow.Subscription {
+ final Flow.Subscription upstreamSubscription;
+ final CharsetDecoder decoder;
+ final String newline;
+ final Demand downstreamDemand;
+ final ConcurrentLinkedDeque<ByteBuffer> queue;
+ final SequentialScheduler scheduler;
+ final Flow.Subscriber<? super String> upstream;
+ final CompletableFuture<?> cf;
+ private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ private final AtomicLong demanded = new AtomicLong();
+ private volatile boolean completed;
+ private volatile boolean cancelled;
+
+ private final char[] chars = new char[1024];
+ private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
+ private final CharBuffer buffer = CharBuffer.wrap(chars);
+ private final StringBuilder builder = new StringBuilder();
+ private int lineCount;
+ private String nextLine;
+
+ private LineSubscription(Flow.Subscription s,
+ CharsetDecoder dec,
+ String separator,
+ Flow.Subscriber<? super String> subscriber,
+ CompletableFuture<?> completion) {
+ downstreamDemand = new Demand();
+ queue = new ConcurrentLinkedDeque<>();
+ upstreamSubscription = Objects.requireNonNull(s);
+ decoder = Objects.requireNonNull(dec);
+ newline = separator;
+ upstream = Objects.requireNonNull(subscriber);
+ cf = Objects.requireNonNull(completion);
+ scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
+ }
+
+ @Override
+ public void request(long n) {
+ if (cancelled) return;
+ if (downstreamDemand.increase(n)) {
+ scheduler.runOrSchedule();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ upstreamSubscription.cancel();
+ }
+
+ public void submit(List<ByteBuffer> list) {
+ queue.addAll(list);
+ demanded.decrementAndGet();
+ scheduler.runOrSchedule();
+ }
+
+ public void signalComplete() {
+ completed = true;
+ scheduler.runOrSchedule();
+ }
+
+ public void signalError(Throwable error) {
+ if (errorRef.compareAndSet(null,
+ Objects.requireNonNull(error))) {
+ scheduler.runOrSchedule();
+ }
+ }
+
+ // This method looks at whether some bytes where left over (in leftover)
+ // from decoding the previous buffer when the previous buffer was in
+ // underflow. If so, it takes bytes one by one from the new buffer 'in'
+ // and combines them with the leftover bytes until 'in' is exhausted or a
+ // character was produced in 'out', resolving the previous underflow.
+ // Returns true if the buffer is still in underflow, false otherwise.
+ // However, in both situation some chars might have been produced in 'out'.
+ private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
+ throws CharacterCodingException {
+ int limit = leftover.position();
+ if (limit == 0) {
+ // no leftover
+ return false;
+ } else {
+ CoderResult res = null;
+ while (in.hasRemaining()) {
+ leftover.position(limit);
+ leftover.limit(++limit);
+ leftover.put(in.get());
+ leftover.position(0);
+ res = decoder.decode(leftover, out,
+ endOfInput && !in.hasRemaining());
+ int remaining = leftover.remaining();
+ if (remaining > 0) {
+ assert leftover.position() == 0;
+ leftover.position(remaining);
+ } else {
+ leftover.position(0);
+ }
+ leftover.limit(leftover.capacity());
+ if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
+ continue;
+ }
+ if (res.isError()) {
+ res.throwException();
+ }
+ assert !res.isOverflow();
+ return false;
+ }
+ return !endOfInput;
+ }
+ }
+
+ // extract characters from start to end and remove them from
+ // the StringBuilder
+ private static String take(StringBuilder b, int start, int end) {
+ assert start == 0;
+ String line;
+ if (end == start) return "";
+ line = b.substring(start, end);
+ b.delete(start, end);
+ return line;
+ }
+
+ // finds end of line, returns -1 if not found, or the position after
+ // the line delimiter if found, removing the delimiter in the process.
+ private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
+ int len = b.length();
+ if (eol != null) { // delimiter explicitly specified
+ int i = b.indexOf(eol);
+ if (i >= 0) {
+ // remove the delimiter and returns the position
+ // of the char after it.
+ b.delete(i, i + eol.length());
+ return i;
+ }
+ } else { // no delimiter specified, behaves as BufferedReader::readLine
+ boolean crfound = false;
+ for (int i = 0; i < len; i++) {
+ char c = b.charAt(i);
+ if (c == '\n') {
+ // '\n' or '\r\n' found.
+ // remove the delimiter and returns the position
+ // of the char after it.
+ b.delete(crfound ? i - 1 : i, i + 1);
+ return crfound ? i - 1 : i;
+ } else if (crfound) {
+ // previous char was '\r', c != '\n'
+ assert i != 0;
+ // remove the delimiter and returns the position
+ // of the char after it.
+ b.delete(i - 1, i);
+ return i - 1;
+ }
+ crfound = c == '\r';
+ }
+ if (crfound && endOfInput) {
+ // remove the delimiter and returns the position
+ // of the char after it.
+ b.delete(len - 1, len);
+ return len - 1;
+ }
+ }
+ return endOfInput && len > 0 ? len : -1;
+ }
+
+ // Looks at whether the StringBuilder contains a line.
+ // Returns null if more character are needed.
+ private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
+ int next = endOfLine(b, eol, endOfInput);
+ return (next > -1) ? take(b, 0, next) : null;
+ }
+
+ // Attempts to read the next line. Returns the next line if
+ // the delimiter was found, null otherwise. The delimiters are
+ // consumed.
+ private String nextLine()
+ throws CharacterCodingException {
+ assert nextLine == null;
+ LINES:
+ while (nextLine == null) {
+ boolean endOfInput = completed && queue.isEmpty();
+ nextLine = nextLine(builder, newline,
+ endOfInput && leftover.position() == 0);
+ if (nextLine != null) return nextLine;
+ ByteBuffer b;
+ BUFFERS:
+ while ((b = queue.peek()) != null) {
+ if (!b.hasRemaining()) {
+ queue.poll();
+ continue BUFFERS;
+ }
+ BYTES:
+ while (b.hasRemaining()) {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ boolean endofInput = completed && queue.size() <= 1;
+ if (isUnderFlow(b, buffer, endofInput)) {
+ assert !b.hasRemaining();
+ if (buffer.position() > 0) {
+ buffer.flip();
+ builder.append(buffer);
+ }
+ continue BUFFERS;
+ }
+ CoderResult res = decoder.decode(b, buffer, endofInput);
+ if (res.isError()) res.throwException();
+ if (buffer.position() > 0) {
+ buffer.flip();
+ builder.append(buffer);
+ continue LINES;
+ }
+ if (res.isUnderflow() && b.hasRemaining()) {
+ //System.out.println("underflow: adding " + b.remaining() + " bytes");
+ leftover.put(b);
+ assert !b.hasRemaining();
+ continue BUFFERS;
+ }
+ }
+ }
+
+ assert queue.isEmpty();
+ if (endOfInput) {
+ // Time to cleanup: there may be some undecoded leftover bytes
+ // We need to flush them out.
+ // The decoder has been configured to replace malformed/unmappable
+ // chars with some replacement, in order to behave like
+ // InputStreamReader.
+ leftover.flip();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+
+ // decode() must be called just before flush, even if there
+ // is nothing to decode. We must do this even if leftover
+ // has no remaining bytes.
+ CoderResult res = decoder.decode(leftover, buffer, endOfInput);
+ if (buffer.position() > 0) {
+ buffer.flip();
+ builder.append(buffer);
+ }
+ if (res.isError()) res.throwException();
+
+ // Now call decoder.flush()
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ res = decoder.flush(buffer);
+ if (buffer.position() > 0) {
+ buffer.flip();
+ builder.append(buffer);
+ }
+ if (res.isError()) res.throwException();
+
+ // It's possible that we reach here twice - just for the
+ // purpose of checking that no bytes were left over, so
+ // we reset leftover/decoder to make the function reentrant.
+ leftover.position(0);
+ leftover.limit(leftover.capacity());
+ decoder.reset();
+
+ // if some chars were produced then this call will
+ // return them.
+ return nextLine = nextLine(builder, newline, endOfInput);
+ }
+ return null;
+ }
+ return null;
+ }
+
+ // The main sequential scheduler loop.
+ private void loop() {
+ try {
+ while (!cancelled) {
+ Throwable error = errorRef.get();
+ if (error != null) {
+ cancelled = true;
+ scheduler.stop();
+ upstream.onError(error);
+ cf.completeExceptionally(error);
+ return;
+ }
+ if (nextLine == null) nextLine = nextLine();
+ if (nextLine == null) {
+ if (completed) {
+ scheduler.stop();
+ if (leftover.position() != 0) {
+ // Underflow: not all bytes could be
+ // decoded, but no more bytes will be coming.
+ // This should not happen as we should already
+ // have got a MalformedInputException, or
+ // replaced the unmappable chars.
+ errorRef.compareAndSet(null,
+ new IllegalStateException(
+ "premature end of input ("
+ + leftover.position()
+ + " undecoded bytes)"));
+ continue;
+ } else {
+ upstream.onComplete();
+ }
+ return;
+ } else if (demanded.get() == 0
+ && !downstreamDemand.isFulfilled()) {
+ long incr = Math.max(1, downstreamDemand.get());
+ demanded.addAndGet(incr);
+ upstreamSubscription.request(incr);
+ continue;
+ } else return;
+ }
+ assert nextLine != null;
+ assert newline != null && !nextLine.endsWith(newline)
+ || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
+ if (downstreamDemand.tryDecrement()) {
+ String forward = nextLine;
+ nextLine = null;
+ upstream.onNext(forward);
+ } else return; // no demand: come back later
+ }
+ } catch (Throwable t) {
+ try {
+ upstreamSubscription.cancel();
+ } finally {
+ signalError(t);
+ }
+ }
+ }
+
+ static LineSubscription create(Flow.Subscription s,
+ Charset charset,
+ String lineSeparator,
+ Flow.Subscriber<? super String> upstream,
+ CompletableFuture<?> cf) {
+ return new LineSubscription(Objects.requireNonNull(s),
+ Objects.requireNonNull(charset).newDecoder()
+ // use the same decoder configuration than
+ // java.io.InputStreamReader
+ .onMalformedInput(CodingErrorAction.REPLACE)
+ .onUnmappableCharacter(CodingErrorAction.REPLACE),
+ lineSeparator,
+ Objects.requireNonNull(upstream),
+ Objects.requireNonNull(cf));
+ }
+ }
+}
+
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,11 +25,15 @@
package jdk.incubator.http;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.security.AccessControlContext;
@@ -52,6 +56,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Stream;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
@@ -461,6 +466,58 @@
}
+ /**
+ * A {@code Stream<String>} built on top of the Flow API.
+ */
+ static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> {
+
+ private final HttpResponseInputStream responseInputStream;
+ private final Charset charset;
+ private HttpLineStream(Charset charset) {
+ this.charset = Objects.requireNonNull(charset);
+ responseInputStream = new HttpResponseInputStream();
+ }
+
+ @Override
+ public CompletionStage<Stream<String>> getBody() {
+ return responseInputStream.getBody().thenApply((is) ->
+ new BufferedReader(new InputStreamReader(is, charset))
+ .lines().onClose(this::close));
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ responseInputStream.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ responseInputStream.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ responseInputStream.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ responseInputStream.onComplete();
+ }
+
+ void close() {
+ try {
+ responseInputStream.close();
+ } catch (IOException x) {
+ // ignore
+ }
+ }
+
+ static HttpLineStream create(Charset charset) {
+ return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
+ }
+ }
+
static class MultiSubscriberImpl<V>
implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
{
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -28,6 +28,7 @@
import jdk.incubator.http.HttpHeaders;
import sun.net.NetProperties;
import sun.net.util.IPAddressUtil;
+import sun.net.www.HeaderParser;
import javax.net.ssl.SSLParameters;
import java.io.ByteArrayOutputStream;
@@ -476,11 +477,17 @@
* UTF_8
*/
public static Charset charsetFrom(HttpHeaders headers) {
- String encoding = headers.firstValue("Content-encoding")
- .orElse("UTF_8");
+ String type = headers.firstValue("Content-type")
+ .orElse("text/html; charset=utf-8");
+ int i = type.indexOf(";");
+ if (i >= 0) type = type.substring(i+1);
try {
- return Charset.forName(encoding);
- } catch (IllegalArgumentException e) {
+ HeaderParser parser = new HeaderParser(type);
+ String value = parser.findValue("charset");
+ if (value == null) return StandardCharsets.UTF_8;
+ return Charset.forName(value);
+ } catch (Throwable x) {
+ Log.logTrace("Can't find charset in \"{0}\" ({1})", type, x);
return StandardCharsets.UTF_8;
}
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineAdaptersCompileOnly.java Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+
+/*
+ * @test
+ * @summary Basic test for Flow adapters with generic type parameters
+ * @compile LineAdaptersCompileOnly.java
+ */
+
+public class LineAdaptersCompileOnly {
+
+ public static void main(String[] args) {
+ makesSureDifferentGenericSignaturesCompile();
+ }
+
+ static void makesSureDifferentGenericSignaturesCompile() {
+
+ BodyHandler.fromLineSubscriber(new StringSubscriber());
+ BodyHandler.fromLineSubscriber(new CharSequenceSubscriber());
+ BodyHandler.fromLineSubscriber(new ObjectSubscriber());
+
+
+ BodySubscriber.fromLineSubscriber(new StringSubscriber());
+ BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber());
+ BodySubscriber.fromLineSubscriber(new ObjectSubscriber());
+
+
+ BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), "\n");
+ BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), "\r\n");
+ BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), "\n");
+ BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null);
+ BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), null);
+ BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), null);
+
+ BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ StandardCharsets.UTF_8, "\n");
+ BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+ StandardCharsets.UTF_16, "\r\n");
+ BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+ StandardCharsets.US_ASCII, "\n");
+ BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ StandardCharsets.UTF_8, null);
+ BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+ StandardCharsets.UTF_16, null);
+ BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+ StandardCharsets.US_ASCII, null);
+ }
+
+ static class StringSubscriber implements Flow.Subscriber<String> {
+ @Override public void onSubscribe(Flow.Subscription subscription) { }
+ @Override public void onNext(String item) { }
+ @Override public void onError(Throwable throwable) { }
+ @Override public void onComplete() { }
+ }
+
+ static class CharSequenceSubscriber implements Flow.Subscriber<CharSequence> {
+ @Override public void onSubscribe(Flow.Subscription subscription) { }
+ @Override public void onNext(CharSequence item) { }
+ @Override public void onError(Throwable throwable) { }
+ @Override public void onComplete() { }
+ }
+
+ static class ObjectSubscriber implements Flow.Subscriber<Object> {
+ @Override public void onSubscribe(Flow.Subscription subscription) { }
+ @Override public void onNext(Object item) { }
+ @Override public void onError(Throwable throwable) { }
+ @Override public void onComplete() { }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineBodyHandlerTest.java Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,702 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.math.BigInteger;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import javax.net.ssl.SSLContext;
+
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+
+/*
+ * @test
+ * @summary Basic tests for line adapter subscribers as created by
+ * the BodyHandlers returned by BodyHandler::fromLineSubscriber
+ * and BodyHandler::asLines
+ * @modules java.base/sun.net.www.http
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * java.logging
+ * jdk.httpserver
+ * @library /lib/testlibrary http2/server
+ * @build Http2TestServer
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @run testng/othervm LineBodyHandlerTest
+ */
+
+public class LineBodyHandlerTest {
+
+ SSLContext sslContext;
+ HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]
+ HttpsServer httpsTestServer; // HTTPS/1.1
+ Http2TestServer http2TestServer; // HTTP/2 ( h2c )
+ Http2TestServer https2TestServer; // HTTP/2 ( h2 )
+ String httpURI;
+ String httpsURI;
+ String http2URI;
+ String https2URI;
+
+ @DataProvider(name = "uris")
+ public Object[][] variants() {
+ return new Object[][]{
+ { httpURI },
+ { httpsURI },
+ { http2URI },
+ { https2URI },
+ };
+ }
+
+ static final Class<NullPointerException> NPE = NullPointerException.class;
+ static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
+
+ @Test
+ public void testNull() {
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null));
+ assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber()));
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), "\n"));
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, "\n"));
+ assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null));
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, null, "\n"));
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), null));
+ assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, null));
+
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+ Charset.defaultCharset(), System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+ Charset.defaultCharset(), System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ null, System.lineSeparator()));
+ assertNotNull(BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ Charset.defaultCharset(), null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+ Charset.defaultCharset(), System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+ null, System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+ Charset.defaultCharset(), null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+ null, System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null,
+ Charset.defaultCharset(), null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ null, null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, null, null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(),
+ null, null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+ Charset.defaultCharset(), null));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null,
+ null, System.lineSeparator()));
+ assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, null, null));
+ }
+
+ @Test
+ public void testIAE() {
+ assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(),""));
+ assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),""));
+ assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), ""));
+ assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(),
+ StandardCharsets.UTF_8, ""));
+ assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),
+ StandardCharsets.UTF_16, ""));
+ assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(),
+ StandardCharsets.US_ASCII, ""));
+ }
+
+ private static final List<String> lines(String text, String eol) {
+ if (eol == null) {
+ return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+ } else {
+ String replaced = text.replace(eol, "|");
+ int i=0;
+ while(replaced.endsWith("||")) {
+ replaced = replaced.substring(0,replaced.length()-1);
+ i++;
+ }
+ List<String> res = List.of(replaced.split("\\|"));
+ if (i > 0) {
+ res = new ArrayList<>(res);
+ for (int j=0; j<i; j++) res.add("");
+ }
+ return res;
+ }
+ }
+
+ @Test(dataProvider = "uris")
+ void testStringWithFinisher(String url) {
+ String body = "May the luck of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ StringSubscriber subscriber = new StringSubscriber();
+ HttpResponse<String> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\n"))
+ .join();
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, body);
+ assertEquals(subscriber.list, lines(body, "\n"));
+ }
+
+ @Test(dataProvider = "uris")
+ void testAsStream(String url) {
+ String body = "May the luck of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, body);
+ assertEquals(list, List.of(body));
+ assertEquals(list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testStringWithFinisher2(String url) {
+ String body = "May the luck\r\n\r\n of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ StringSubscriber subscriber = new StringSubscriber();
+ HttpResponse<Void> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber)).join();
+ String text = subscriber.get();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, body.replace("\r\n", "\n"));
+ assertEquals(subscriber.list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testAsStreamWithCRLF(String url) {
+ String body = "May the luck\r\n\r\n of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, "May the luck|| of the Irish be with you!");
+ assertEquals(list, List.of("May the luck",
+ "",
+ " of the Irish be with you!"));
+ assertEquals(list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testStringWithFinisherBlocking(String url) throws Exception {
+ String body = "May the luck of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body)).build();
+
+ StringSubscriber subscriber = new StringSubscriber();
+ HttpResponse<String> response = client.send(request,
+ BodyHandler.fromLineSubscriber(subscriber, Supplier::get, "\n"));
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, "May the luck of the Irish be with you!");
+ assertEquals(subscriber.list, lines(body, "\n"));
+ }
+
+ @Test(dataProvider = "uris")
+ void testStringWithoutFinisherBlocking(String url) throws Exception {
+ String body = "May the luck of the Irish be with you!";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body)).build();
+
+ StringSubscriber subscriber = new StringSubscriber();
+ HttpResponse<Void> response = client.send(request,
+ BodyHandler.fromLineSubscriber(subscriber));
+ String text = subscriber.get();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, "May the luck of the Irish be with you!");
+ assertEquals(subscriber.list, lines(body, null));
+ }
+
+ // Subscriber<Object>
+
+ @Test(dataProvider = "uris")
+ void testAsStreamWithMixedCRLF(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May| the wind| always be|at your back.|");
+ assertEquals(list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back.",
+ ""));
+ assertEquals(list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testAsStreamWithMixedCRLF_UTF8(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .header("Content-type", "text/text; charset=UTF-8")
+ .POST(fromString(body, UTF_8)).build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May| the wind| always be|at your back.|");
+ assertEquals(list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back.", ""));
+ assertEquals(list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testAsStreamWithMixedCRLF_UTF16(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .header("Content-type", "text/text; charset=UTF-16")
+ .POST(fromString(body, UTF_16)).build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May| the wind| always be|at your back.|");
+ assertEquals(list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back.",
+ ""));
+ assertEquals(list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testObjectWithFinisher(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ HttpResponse<String> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber, ObjectSubscriber::get, "\r\n"))
+ .join();
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May\n the wind\n always be\rat your back.");
+ assertEquals(subscriber.list, List.of("May",
+ " the wind",
+ " always be\rat your back."));
+ assertEquals(subscriber.list, lines(body, "\r\n"));
+ }
+
+ @Test(dataProvider = "uris")
+ void testObjectWithFinisher_UTF16(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.\r\r";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .header("Content-type", "text/text; charset=UTF-16")
+ .POST(fromString(body, UTF_16)).build();
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ HttpResponse<String> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber,
+ ObjectSubscriber::get,
+ null)).join();
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May\n the wind\n always be\nat your back.\n");
+ assertEquals(subscriber.list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back.",
+ ""));
+ assertEquals(subscriber.list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testObjectWithoutFinisher(String url) {
+ String body = "May\r\n the wind\r\n always be\rat your back.";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ HttpResponse<Void> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber)).join();
+ String text = subscriber.get();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May\n the wind\n always be\nat your back.");
+ assertEquals(subscriber.list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back."));
+ assertEquals(subscriber.list, lines(body, null));
+ }
+
+ @Test(dataProvider = "uris")
+ void testObjectWithFinisherBlocking(String url) throws Exception {
+ String body = "May\r\n the wind\r\n always be\nat your back.";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ HttpResponse<String> response = client.send(request,
+ BodyHandler.fromLineSubscriber(subscriber,
+ ObjectSubscriber::get,
+ "\r\n"));
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May\n the wind\n always be\nat your back.");
+ assertEquals(subscriber.list, List.of("May",
+ " the wind",
+ " always be\nat your back."));
+ assertEquals(subscriber.list, lines(body, "\r\n"));
+ }
+
+ @Test(dataProvider = "uris")
+ void testObjectWithoutFinisherBlocking(String url) throws Exception {
+ String body = "May\r\n the wind\r\n always be\nat your back.";
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(body))
+ .build();
+
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ HttpResponse<Void> response = client.send(request,
+ BodyHandler.fromLineSubscriber(subscriber));
+ String text = subscriber.get();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertTrue(text.length() != 0); // what else can be asserted!
+ assertEquals(text, "May\n the wind\n always be\nat your back.");
+ assertEquals(subscriber.list, List.of("May",
+ " the wind",
+ " always be",
+ "at your back."));
+ assertEquals(subscriber.list, lines(body, null));
+ }
+
+ static private final String LINE = "Bient\u00f4t nous plongerons dans les" +
+ " fr\u00f4\ud801\udc00des t\u00e9n\u00e8bres, ";
+
+ static private final String bigtext() {
+ StringBuilder res = new StringBuilder((LINE.length() + 1) * 50);
+ for (int i = 0; i<50; i++) {
+ res.append(LINE);
+ if (i%2 == 0) res.append("\r\n");
+ }
+ return res.toString();
+ }
+
+ @Test(dataProvider = "uris")
+ void testBigTextFromLineSubscriber(String url) {
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ String bigtext = bigtext();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(bigtext))
+ .build();
+
+ StringSubscriber subscriber = new StringSubscriber();
+ HttpResponse<String> response = client.sendAsync(request,
+ BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\r\n"))
+ .join();
+ String text = response.body();
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, bigtext.replace("\r\n", "\n"));
+ assertEquals(subscriber.list, lines(bigtext, "\r\n"));
+ }
+
+ @Test(dataProvider = "uris")
+ void testBigTextAsStream(String url) {
+ HttpClient client = HttpClient.newBuilder().sslContext(sslContext)
+ .build();
+ String bigtext = bigtext();
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .POST(fromString(bigtext))
+ .build();
+
+ HttpResponse<Stream<String>> response = client.sendAsync(request,
+ BodyHandler.asLines()).join();
+ Stream<String> stream = response.body();
+ List<String> list = stream.collect(Collectors.toList());
+ String text = list.stream().collect(Collectors.joining("|"));
+ System.out.println(text);
+ assertEquals(response.statusCode(), 200);
+ assertEquals(text, bigtext.replace("\r\n", "|"));
+ assertEquals(list, List.of(bigtext.split("\r\n")));
+ assertEquals(list, lines(bigtext, null));
+ }
+
+ /** An abstract Subscriber that converts all received data into a String. */
+ static abstract class AbstractSubscriber implements Supplier<String> {
+ protected volatile Flow.Subscription subscription;
+ protected final StringBuilder baos = new StringBuilder();
+ protected volatile String text;
+ protected volatile RuntimeException error;
+ protected final List<Object> list = new CopyOnWriteArrayList<>();
+
+ public void onSubscribe(Flow.Subscription subscription) {
+ this.subscription = subscription;
+ subscription.request(Long.MAX_VALUE);
+ }
+ public void onError(Throwable throwable) {
+ System.out.println(this + " onError: " + throwable);
+ error = new RuntimeException(throwable);
+ }
+ public void onComplete() {
+ System.out.println(this + " onComplete");
+ text = baos.toString();
+ }
+ @Override public String get() {
+ if (error != null) throw error;
+ return text;
+ }
+ }
+
+ static class StringSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<String>, Supplier<String>
+ {
+ @Override public void onNext(String item) {
+ System.out.print(this + " onNext: \"" + item + "\"");
+ if (baos.length() != 0) baos.append('\n');
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+ static class CharSequenceSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<CharSequence>, Supplier<String>
+ {
+ @Override public void onNext(CharSequence item) {
+ System.out.print(this + " onNext: " + item);
+ if (baos.length() != 0) baos.append('\n');
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+ static class ObjectSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<Object>, Supplier<String>
+ {
+ @Override public void onNext(Object item) {
+ System.out.print(this + " onNext: " + item);
+ if (baos.length() != 0) baos.append('\n');
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+
+ static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
+ try {
+ baos.write(ba);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @BeforeTest
+ public void setup() throws Exception {
+ sslContext = new SimpleSSLContext().get();
+ if (sslContext == null)
+ throw new AssertionError("Unexpected null sslContext");
+
+ InetSocketAddress sa = new InetSocketAddress("localhost", 0);
+ httpTestServer = HttpServer.create(sa, 0);
+ httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
+ httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
+
+ httpsTestServer = HttpsServer.create(sa, 0);
+ httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+ httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
+ httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
+
+ http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+ http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
+ int port = http2TestServer.getAddress().getPort();
+ http2URI = "http://127.0.0.1:" + port + "/http2/echo";
+
+ https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+ https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
+ port = https2TestServer.getAddress().getPort();
+ https2URI = "https://127.0.0.1:" + port + "/https2/echo";
+
+ httpTestServer.start();
+ httpsTestServer.start();
+ http2TestServer.start();
+ https2TestServer.start();
+ }
+
+ @AfterTest
+ public void teardown() throws Exception {
+ httpTestServer.stop(0);
+ httpsTestServer.stop(0);
+ http2TestServer.stop();
+ https2TestServer.stop();
+ }
+
+ static void printBytes(PrintStream out, String prefix, byte[] bytes) {
+ int padding = 4 + 4 - (bytes.length % 4);
+ padding = padding > 4 ? padding - 4 : 4;
+ byte[] bigbytes = new byte[bytes.length + padding];
+ System.arraycopy(bytes, 0, bigbytes, padding, bytes.length);
+ out.println(prefix + bytes.length + " "
+ + new BigInteger(bigbytes).toString(16));
+ }
+
+ static class Http1EchoHandler implements HttpHandler {
+ @Override
+ public void handle(HttpExchange t) throws IOException {
+ try (InputStream is = t.getRequestBody();
+ OutputStream os = t.getResponseBody()) {
+ byte[] bytes = is.readAllBytes();
+ printBytes(System.out,"Bytes: ", bytes);
+ if (t.getRequestHeaders().containsKey("Content-type")) {
+ t.getResponseHeaders().add("Content-type",
+ t.getRequestHeaders().getFirst("Content-type"));
+ }
+ t.sendResponseHeaders(200, bytes.length);
+ os.write(bytes);
+ }
+ }
+ }
+
+ static class Http2EchoHandler implements Http2Handler {
+ @Override
+ public void handle(Http2TestExchange t) throws IOException {
+ try (InputStream is = t.getRequestBody();
+ OutputStream os = t.getResponseBody()) {
+ byte[] bytes = is.readAllBytes();
+ printBytes(System.out,"Bytes: ", bytes);
+ if (t.getRequestHeaders().firstValue("Content-type").isPresent()) {
+ t.getResponseHeaders().addHeader("Content-type",
+ t.getRequestHeaders().firstValue("Content-type").get());
+ }
+ t.sendResponseHeaders(200, bytes.length);
+ os.write(bytes);
+ }
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineStreamsAndSurrogatesTest.java Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,319 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.MalformedInputException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.SubmissionPublisher;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @summary tests for BodySubscribers returned by asLines.
+ * In particular tests that surrogate characters are handled
+ * correctly.
+ * @modules jdk.incubator.httpclient java.logging
+ * @run testng/othervm LineStreamsAndSurrogatesTest
+ */
+
+public class LineStreamsAndSurrogatesTest {
+
+
+ static final Class<NullPointerException> NPE = NullPointerException.class;
+
+ private static final List<String> lines(String text) {
+ return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+ }
+
+ @Test
+ void testUncomplete() throws Exception {
+ // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r les\n\n" +
+ " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00";
+ Charset charset = UTF_8;
+
+ BodySubscriber<Stream<String>> bodySubscriber =
+ BodySubscriber.asLines(charset);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ Runnable run = () -> {
+ try {
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] sbytes = text.getBytes(charset);
+ byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ } catch(Throwable t) {
+ errorRef.set(t);
+ }
+ };
+ Thread thread = new Thread(run,"Publishing");
+ thread.start();
+ try {
+ Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+ List<String> list = stream.collect(Collectors.toList());
+ String resp = list.stream().collect(Collectors.joining(""));
+ System.out.println("***** Got: " + resp);
+
+ byte[] sbytes = text.getBytes(UTF_8);
+ byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(bais, charset));
+ String resp2 = reader.lines().collect(Collectors.joining(""));
+ System.out.println("***** Got2: " + resp2);
+
+ assertEquals(resp, resp2);
+ assertEquals(list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres\ufffd"));
+ } catch (ExecutionException x) {
+ Throwable cause = x.getCause();
+ if (cause instanceof MalformedInputException) {
+ throw new RuntimeException("Unexpected MalformedInputException", cause);
+ }
+ throw x;
+ }
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Unexpected exception", errorRef.get());
+ }
+ }
+
+ @Test
+ void testStream1() throws Exception {
+ // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r les\n\n" +
+ " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+ Charset charset = UTF_8;
+
+ BodySubscriber<Stream<String>> bodySubscriber =
+ BodySubscriber.asLines(charset);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(charset);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ Runnable run = () -> {
+ try {
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ } catch(Throwable t) {
+ errorRef.set(t);
+ }
+ };
+
+ Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+ Thread thread = new Thread(run,"Publishing");
+ thread.start();
+ List<String> list = stream.collect(Collectors.toList());
+ String resp = list.stream().collect(Collectors.joining("|"));
+ System.out.println("***** Got: " + resp);
+ assertEquals(resp, text.replace("\r\n", "|")
+ .replace("\n","|")
+ .replace("\r","|"));
+ assertEquals(list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ "",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres"));
+ assertEquals(list, lines(text));
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Unexpected exception", errorRef.get());
+ }
+ }
+
+
+ @Test
+ void testStream2() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+ " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+ Charset charset = UTF_8;
+
+ BodySubscriber<Stream<String>> bodySubscriber = BodySubscriber.asLines(charset);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(charset);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ Runnable run = () -> {
+ try {
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ } catch(Throwable t) {
+ errorRef.set(t);
+ }
+ };
+
+ Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+ Thread thread = new Thread(run,"Publishing");
+ thread.start();
+ List<String> list = stream.collect(Collectors.toList());
+ String resp = list.stream().collect(Collectors.joining(""));
+ System.out.println("***** Got: " + resp);
+ String expected = Stream.of(text.split("\r\n|\r|\n"))
+ .collect(Collectors.joining(""));
+ assertEquals(resp, expected);
+ assertEquals(list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ "",
+ " les fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres",
+ ""));
+ assertEquals(list, lines(text));
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Unexpected exception", errorRef.get());
+ }
+ }
+
+ @Test
+ void testStream3_UTF16() throws Exception {
+ // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+ " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+ Charset charset = UTF_16;
+
+ BodySubscriber<Stream<String>> bodySubscriber =
+ BodySubscriber.asLines(charset);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(charset);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ Runnable run = () -> {
+ try {
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ } catch(Throwable t) {
+ errorRef.set(t);
+ }
+ };
+
+ Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+ Thread thread = new Thread(run,"Publishing");
+ thread.start();
+ List<String> list = stream.collect(Collectors.toList());
+ String resp = list.stream().collect(Collectors.joining(""));
+ System.out.println("***** Got: " + resp);
+ assertEquals(resp, text.replace("\n","").replace("\r",""));
+ assertEquals(list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ "",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres"));
+ assertEquals(list, lines(text));
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Unexpected exception", errorRef.get());
+ }
+ }
+
+
+ @Test
+ void testStream4_UTF16() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
+ " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+ Charset charset = UTF_16;
+
+ BodySubscriber<Stream<String>> bodySubscriber =
+ BodySubscriber.asLines(charset);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(charset);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ Runnable run = () -> {
+ try {
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ } catch(Throwable t) {
+ errorRef.set(t);
+ }
+ };
+
+ Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
+ Thread thread = new Thread(run,"Publishing");
+ thread.start();
+ List<String> list = stream.collect(Collectors.toList());
+ String resp = list.stream().collect(Collectors.joining(""));
+ System.out.println("***** Got: " + resp);
+ String expected = Stream.of(text.split("\r\n|\r|\n"))
+ .collect(Collectors.joining(""));
+ assertEquals(resp, expected);
+ assertEquals(list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ "",
+ " les fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres",
+ ""));
+ assertEquals(list, lines(text));
+ if (errorRef.get() != null) {
+ throw new RuntimeException("Unexpected exception", errorRef.get());
+ }
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/LineSubscribersAndSurrogatesTest.java Fri Jan 12 15:36:28 2018 +0000
@@ -0,0 +1,384 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import java.util.concurrent.SubmissionPublisher;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_16;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @summary tests for BodySubscribers returned by fromLineSubscriber.
+ * In particular tests that surrogate characters are handled
+ * correctly.
+ * @modules jdk.incubator.httpclient java.logging
+ * @run testng/othervm LineSubscribersAndSurrogatesTest
+ */
+
+public class LineSubscribersAndSurrogatesTest {
+
+
+ static final Class<NullPointerException> NPE = NullPointerException.class;
+
+ private static final List<String> lines(String text, String eol) {
+ if (eol == null) {
+ return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
+ } else {
+ String replaced = text.replace(eol, "|");
+ int i=0;
+ while(replaced.endsWith("||")) {
+ replaced = replaced.substring(0,replaced.length()-1);
+ i++;
+ }
+ List<String> res = List.of(replaced.split("\\|"));
+ if (i > 0) {
+ res = new ArrayList<>(res);
+ for (int j=0; j<i; j++) res.add("");
+ }
+ return res;
+ }
+ }
+
+ @Test
+ void testIncomplete() throws Exception {
+ // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_8, null);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] sbytes = text.getBytes(UTF_8);
+ byte[] bytes = Arrays.copyOfRange(sbytes,0, sbytes.length - 1);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ try {
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(bais, UTF_8));
+ String resp2 = reader.lines().collect(Collectors.joining(""));
+ assertEquals(resp, resp2);
+ assertEquals(subscriber.list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres\ufffd"));
+ } catch (ExecutionException x) {
+ Throwable cause = x.getCause();
+ if (cause instanceof MalformedInputException) {
+ throw new RuntimeException("Unexpected MalformedInputException thrown", cause);
+ }
+ throw x;
+ }
+ }
+
+
+ @Test
+ void testStringWithFinisherLF() throws Exception {
+ // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_8, "\n");
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_8);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ List<String> expected = List.of("Bient\u00f4t\r",
+ " nous plongerons\r",
+ " dans\r les",
+ "",
+ " fr\u00f4\ud801\udc00des\r",
+ " t\u00e9n\u00e8bres\r");
+ assertEquals(subscriber.list, expected);
+ assertEquals(resp, Stream.of(text.split("\n")).collect(Collectors.joining("")));
+ assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+ assertEquals(subscriber.list, lines(text, "\n"));
+ }
+
+
+ @Test
+ void testStringWithFinisherCR() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_8, "\r");
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_8);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ assertEquals(resp, text.replace("\r", ""));
+ assertEquals(subscriber.list, List.of("Bient\u00f4t",
+ "\n nous plongerons",
+ "\n dans",
+ " les fr\u00f4\ud801\udc00des",
+ "\n t\u00e9n\u00e8bres",
+ ""));
+ assertEquals(subscriber.list, lines(text, "\r"));
+ }
+
+ @Test
+ void testStringWithFinisherCRLF() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_8, "\r\n");
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_8);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ assertEquals(resp, text.replace("\r\n",""));
+ assertEquals(subscriber.list, List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans\r les fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres"));
+ assertEquals(subscriber.list, lines(text, "\r\n"));
+ }
+
+
+ @Test
+ void testStringWithFinisherBR() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_8, null);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_8);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ List<String> expected = List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres");
+ assertEquals(subscriber.list, expected);
+ assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+ assertEquals(subscriber.list, lines(text, null));
+ }
+
+ @Test
+ void testStringWithFinisherBR_UTF_16() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<String> bodySubscriber = BodySubscriber.fromLineSubscriber(
+ subscriber, Supplier::get, UTF_16, null);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_16);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i=0; i<bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ String resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ List<String> expected = List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres",
+ "");
+ assertEquals(resp, expected.stream().collect(Collectors.joining("")));
+ assertEquals(subscriber.list, expected);
+ assertEquals(subscriber.list, lines(text, null));
+ }
+
+ void testStringWithoutFinisherBR() throws Exception {
+ String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" +
+ " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
+ ObjectSubscriber subscriber = new ObjectSubscriber();
+ BodySubscriber<Void> bodySubscriber = BodySubscriber.fromLineSubscriber(subscriber);
+ SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
+ byte[] bytes = text.getBytes(UTF_8);
+ publisher.subscribe(bodySubscriber);
+ System.out.println("Publishing " + bytes.length + " bytes");
+ for (int i = 0; i < bytes.length; i++) {
+ // ensure that surrogates are split over several buffers.
+ publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
+ }
+ publisher.close();
+ Void resp = bodySubscriber.getBody().toCompletableFuture().get();
+ System.out.println("***** Got: " + resp);
+ List<String> expected = List.of("Bient\u00f4t",
+ " nous plongerons",
+ " dans",
+ " les",
+ "",
+ " fr\u00f4\ud801\udc00des",
+ " t\u00e9n\u00e8bres");
+ assertEquals(subscriber.text, expected.stream().collect(Collectors.joining("")));
+ assertEquals(subscriber.list, expected);
+ assertEquals(subscriber.list, lines(text, null));
+ }
+
+
+ /** An abstract Subscriber that converts all received data into a String. */
+ static abstract class AbstractSubscriber implements Supplier<String> {
+ protected final List<Object> list = new CopyOnWriteArrayList<>();
+ protected volatile Flow.Subscription subscription;
+ protected final StringBuilder baos = new StringBuilder();
+ protected volatile String text;
+ protected volatile RuntimeException error;
+
+ public void onSubscribe(Flow.Subscription subscription) {
+ this.subscription = subscription;
+ subscription.request(Long.MAX_VALUE);
+ }
+ public void onError(Throwable throwable) {
+ System.out.println(this + " onError: " + throwable);
+ error = new RuntimeException(throwable);
+ }
+ public void onComplete() {
+ System.out.println(this + " onComplete");
+ text = baos.toString();
+ }
+ @Override public String get() {
+ if (error != null) throw error;
+ return text;
+ }
+ public final List<?> list() {
+ return list;
+ }
+ }
+
+ static class StringSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<String>, Supplier<String>
+ {
+ @Override public void onNext(String item) {
+ System.out.println(this + " onNext: \""
+ + item.replace("\n","\\n")
+ .replace("\r", "\\r")
+ + "\"");
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+ static class CharSequenceSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<CharSequence>, Supplier<String>
+ {
+ @Override public void onNext(CharSequence item) {
+ System.out.println(this + " onNext: \""
+ + item.toString().replace("\n","\\n")
+ .replace("\r", "\\r")
+ + "\"");
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+ static class ObjectSubscriber extends AbstractSubscriber
+ implements Flow.Subscriber<Object>, Supplier<String>
+ {
+ @Override public void onNext(Object item) {
+ System.out.println(this + " onNext: \""
+ + item.toString().replace("\n","\\n")
+ .replace("\r", "\\r")
+ + "\"");
+ baos.append(item);
+ list.add(item);
+ }
+ }
+
+
+ static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
+ try {
+ baos.write(ba);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Jan 05 14:11:48 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -122,7 +122,7 @@
if (c == -1) {
return -1;
}
- return one[0];
+ return one[0] & 0xFF;
}
@Override
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Jan 05 14:11:48 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -269,9 +269,9 @@
}
}
- String doUpgrade() throws IOException {
- String upgrade = readHttp1Request();
- String h2c = getHeader(upgrade, "Upgrade");
+ Http1InitialRequest doUpgrade() throws IOException {
+ Http1InitialRequest upgrade = readHttp1Request();
+ String h2c = getHeader(upgrade.headers, "Upgrade");
if (h2c == null || !h2c.equals("h2c")) {
System.err.println("Server:HEADERS: " + upgrade);
throw new IOException("Bad upgrade 1 " + h2c);
@@ -283,7 +283,7 @@
sendSettingsFrame();
readPreface();
- String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
+ String clientSettingsString = getHeader(upgrade.headers, "HTTP2-Settings");
clientSettings = getSettingsFromString(clientSettingsString);
return upgrade;
@@ -313,7 +313,7 @@
}
void run() throws Exception {
- String upgrade = null;
+ Http1InitialRequest upgrade = null;
if (!secure) {
upgrade = doUpgrade();
} else {
@@ -462,9 +462,9 @@
// First stream (1) comes from a plaintext HTTP/1.1 request
@SuppressWarnings({"rawtypes","unchecked"})
- void createPrimordialStream(String request) throws IOException {
+ void createPrimordialStream(Http1InitialRequest request) throws IOException {
HttpHeadersImpl headers = new HttpHeadersImpl();
- String requestLine = getRequestLine(request);
+ String requestLine = getRequestLine(request.headers);
String[] tokens = requestLine.split(" ");
if (!tokens[2].equals("HTTP/1.1")) {
throw new IOException("bad request line");
@@ -475,7 +475,7 @@
} catch (URISyntaxException e) {
throw new IOException(e);
}
- String host = getHeader(request, "Host");
+ String host = getHeader(request.headers, "Host");
if (host == null) {
throw new IOException("missing Host");
}
@@ -485,9 +485,9 @@
headers.setHeader(":authority", host);
headers.setHeader(":path", uri.getPath());
Queue q = new Queue(sentinel);
- String body = getRequestBody(request);
- addHeaders(getHeaders(request), headers);
- headers.setHeader("Content-length", Integer.toString(body.length()));
+ byte[] body = getRequestBody(request);
+ addHeaders(getHeaders(request.headers), headers);
+ headers.setHeader("Content-length", Integer.toString(body.length));
addRequestBodyToQueue(body, q);
streams.put(1, q);
@@ -885,7 +885,16 @@
final static String CRLF = "\r\n";
final static String CRLFCRLF = "\r\n\r\n";
- String readHttp1Request() throws IOException {
+ static class Http1InitialRequest {
+ final String headers;
+ final byte[] body;
+ Http1InitialRequest(String headers, byte[] body) {
+ this.headers = headers;
+ this.body = body.clone();
+ }
+ }
+
+ Http1InitialRequest readHttp1Request() throws IOException {
String headers = readUntil(CRLF + CRLF);
int clen = getContentLength(headers);
String te = getHeader(headers, "Transfer-encoding");
@@ -899,8 +908,7 @@
// HTTP/1.1 chunked data, read it
buf = readChunkedInputStream(is);
}
- String body = new String(buf, StandardCharsets.US_ASCII);
- return headers + body;
+ return new Http1InitialRequest(headers, buf);
} catch (IOException e) {
System.err.println("TestServer: headers read: [ " + headers + " ]");
throw e;
@@ -943,19 +951,13 @@
// wrapper around a BlockingQueue that throws an exception when it's closed
// Each stream has one of these
- String getRequestBody(String request) {
- int bodystart = request.indexOf(CRLF+CRLF);
- String body;
- if (bodystart == -1)
- body = "";
- else
- body = request.substring(bodystart+4);
- return body;
+ byte[] getRequestBody(Http1InitialRequest request) {
+ return request.body;
}
@SuppressWarnings({"rawtypes","unchecked"})
- void addRequestBodyToQueue(String body, Queue q) throws IOException {
- ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
+ void addRequestBodyToQueue(byte[] body, Queue q) throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(body);
DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
// only used for primordial stream
q.put(df);