# HG changeset patch # User dfuchs # Date 1515771388 0 # Node ID cf8792f51dee9de3cea05c66e1b676dac612243c # Parent bbd688c6fbbb135518c4894683dacf2f84ff7008 http-client-branch: Adds some convenience methods to parse body responses as lines diff -r bbd688c6fbbb -r cf8792f51dee src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 05 14:11:48 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 12 15:36:28 2018 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,7 @@ package jdk.incubator.http; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -34,12 +35,12 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.security.AccessControlContext; -import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -49,6 +50,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import javax.net.ssl.SSLParameters; /** @@ -399,6 +401,91 @@ } /** + * Returns a response body handler that returns a {@link BodySubscriber + * BodySubscriber}{@code } obtained from {@link + * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String) + * BodySubscriber.fromLineSubscriber(subscriber, s -> null, charset, null)}, + * with the given {@code subscriber}. + * The {@link Charset charset} used to decode the response body bytes is + * obtained from the HTTP response headers as specified by {@link #asString()}, + * and lines are delimited in the manner of {@link BufferedReader#readLine()}. + * + *

The response body is not available through this, or the {@code + * HttpResponse} API, but instead all response body is forwarded to the + * given {@code subscriber}, which should make it available, if + * appropriate, through some other mechanism, e.g. an entry in a + * database, etc. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + *

For example: + *

 {@code
+         *  TextSubscriber subscriber = new TextSubscriber();
+         *  HttpResponse response = client.sendAsync(request,
+         *      BodyHandler.fromLineSubscriber(subscriber, "\n")).join();
+         *  System.out.println(response.statusCode());
+         * }
+ * + * @param subscriber the subscriber + * @return a response body handler + */ + public static BodyHandler + fromLineSubscriber(Subscriber subscriber) { + Objects.requireNonNull(subscriber); + return (status, headers) + -> BodySubscriber.fromLineSubscriber(subscriber, s -> null, + charsetFrom(headers), null); + } + + /** + * Returns a response body handler that returns a {@link BodySubscriber + * BodySubscriber}{@code } obtained from {@link + * BodySubscriber#fromLineSubscriber(Subscriber, Function, Charset, String) + * BodySubscriber.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)}, + * with the given {@code subscriber}, {@code finisher} function, and line separator. + * The {@link Charset charset} used to decode the response body bytes is + * obtained from the HTTP response headers as specified by {@link #asString()}. + * + *

The given {@code finisher} function is applied after the given + * subscriber's {@code onComplete} has been invoked. The {@code finisher} + * function is invoked with the given subscriber, and returns a value + * that is set as the response's body. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + *

For example: + *

 {@code
+         * TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
+         * HttpResponse response = client.sendAsync(request,
+         *     BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult, "\n")).join();
+         * String text = response.body();
+         * }
+ * + * @param the type of the Subscriber + * @param the type of the response body + * @param subscriber the subscriber + * @param finisher a function to be applied after the subscriber has completed + * @param lineSeparator an optional line separator: can be {@code null}, + * in which case lines will be delimited in the manner of + * {@link BufferedReader#readLine()}. + * @return a response body handler + * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string. + */ + public static ,T> BodyHandler + fromLineSubscriber(S subscriber, Function finisher, String lineSeparator) { + Objects.requireNonNull(subscriber); + Objects.requireNonNull(finisher); + // implicit null check + if (lineSeparator != null && lineSeparator.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + return (status, headers) -> + BodySubscriber.fromLineSubscriber(subscriber, finisher, + charsetFrom(headers), lineSeparator); + } + + /** * Returns a response body handler which discards the response body and * uses the given value as a replacement for it. * @@ -542,6 +629,25 @@ } /** + * Returns a {@code BodyHandler>} that returns a + * {@link BodySubscriber BodySubscriber}{@code >} obtained from + * {@link BodySubscriber#asLines(Charset)} + * BodySubscriber.asLines(charset)}. + * The {@link Charset charset} used to decode the response body bytes is + * obtained from the HTTP response headers as specified by {@link #asString()}, + * and lines are delimited in the manner of {@link BufferedReader#readLine()}. + * + *

When the {@code HttpResponse} object is returned, the body may + * not have been completely received. + * + * @return a response body handler + */ + public static BodyHandler> asLines() { + return (status, headers) -> + BodySubscriber.asLines(charsetFrom(headers)); + } + + /** * Returns a {@code BodyHandler} that returns a * {@link BodySubscriber BodySubscriber}{@code } obtained from * {@link BodySubscriber#asByteArrayConsumer(Consumer) @@ -578,7 +684,7 @@ * {@link BodySubscriber#asString(java.nio.charset.Charset) * BodySubscriber.asString(Charset)}. The body is * decoded using the character set specified in - * the {@code Content-encoding} response header. If there is no such + * the {@code Content-type} response header. If there is no such * header, or the character set is not supported, then * {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used. * @@ -712,6 +818,71 @@ } /** + * Returns a body subscriber that forwards all response body to the + * given {@code Flow.Subscriber}, lines by lines. + * The {@linkplain #getBody()} completion + * stage} of the returned body subscriber completes after one of the + * given subscribers {@code onComplete} or {@code onError} has been + * invoked. + * Bytes are decoded using the {@linkplain StandardCharsets#UTF_8 + * UTF-8} charset, and lines are delimited in the manner of + * {@link BufferedReader#readLine()}. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + * @implNote This is equivalent to calling

{@code
+         *      fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null)
+         * }
+ * + * @param the type of the Subscriber + * @param subscriber the subscriber + * @return a body subscriber + */ + public static > BodySubscriber + fromLineSubscriber(S subscriber) { + return fromLineSubscriber(subscriber, s -> null, + StandardCharsets.UTF_8, null); + } + + /** + * Returns a body subscriber that forwards all response body to the + * given {@code Flow.Subscriber}, lines by lines. + * The {@linkplain #getBody()} completion + * stage} of the returned body subscriber completes after one of the + * given subscribers {@code onComplete} or {@code onError} has been + * invoked. + * + *

The given {@code finisher} function is applied after the given + * subscriber's {@code onComplete} has been invoked. The {@code finisher} + * function is invoked with the given subscriber, and returns a value + * that is set as the response's body. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + * @param the type of the Subscriber + * @param the type of the response body + * @param subscriber the subscriber + * @param finisher a function to be applied after the subscriber has + * completed + * @param charset a {@link Charset} to decode the bytes + * @param lineSeparator an optional line separator: can be {@code null}, + * in which case lines will be delimited in the manner of + * {@link BufferedReader#readLine()}. + * @return a body subscriber + * @throws IllegalArgumentException if the supplied {@code lineSeparator} is the empty string. + */ + public static ,T> BodySubscriber + fromLineSubscriber(S subscriber, + Function finisher, + Charset charset, + String lineSeparator) { + return LineSubscriberAdapter.create(subscriber, + finisher, charset, lineSeparator); + } + + /** * Returns a body subscriber which stores the response body as a {@code * String} converted using the given {@code Charset}. * @@ -847,6 +1018,33 @@ } /** + * Returns a {@code BodySubscriber} which streams the response body as + * a {@link Stream Stream}, where each string in the stream + * corresponds to a line as defined by {@link BufferedReader#lines()}. + * + *

The {@link HttpResponse} using this subscriber is available + * immediately after the response headers have been read, without + * requiring to wait for the entire body to be processed. The response + * body can then be read directly from the {@link Stream}. + * + * @apiNote To ensure that all resources associated with the + * corresponding exchange are properly released the caller must + * ensure to either read all lines until the stream is exhausted, + * or call {@link Stream#close} if it is unable or unwilling to do so. + * Calling {@code close} before exhausting the stream may cause + * the underlying HTTP connection to be closed and prevent it + * from being reused for subsequent operations. + * + * @return a body subscriber that streams the response body as a + * {@link Stream Stream}. + * + * @see BufferedReader#lines() + */ + public static BodySubscriber> asLines(Charset charset) { + return ResponseSubscribers.HttpLineStream.create(charset); + } + + /** * Returns a response subscriber which discards the response body. The * supplied value is the value that will be returned from * {@link HttpResponse#body()}. diff -r bbd688c6fbbb -r cf8792f51dee src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java Fri Jan 12 15:36:28 2018 +0000 @@ -0,0 +1,467 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.internal.common.MinimalFuture; +import jdk.incubator.http.internal.common.SequentialScheduler; + +/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ +final class LineSubscriberAdapter,R> + implements HttpResponse.BodySubscriber { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + private final Charset charset; + private final String eol; + private volatile LineSubscription downstream; + + private LineSubscriberAdapter(S subscriber, + Function finisher, + Charset charset, + String eol) { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + this.charset = Objects.requireNonNull(charset); + this.eol = eol; + } + + @Override + public void onSubscribe(Subscription subscription) { + downstream = LineSubscription.create(subscription, + charset, + eol, + subscriber, + cf); + subscriber.onSubscribe(downstream); + } + + @Override + public void onNext(List item) { + try { + downstream.submit(item); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable throwable) { + try { + downstream.signalError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + downstream.signalComplete(); + } finally { + cf.complete(finisher.apply(subscriber)); + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + + static , R> LineSubscriberAdapter + create(S subscriber, Function finisher, Charset charset, String eol) + { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), + Objects.requireNonNull(finisher), + Objects.requireNonNull(charset), + eol); + } + + static final class LineSubscription implements Flow.Subscription { + final Flow.Subscription upstreamSubscription; + final CharsetDecoder decoder; + final String newline; + final Demand downstreamDemand; + final ConcurrentLinkedDeque queue; + final SequentialScheduler scheduler; + final Flow.Subscriber upstream; + final CompletableFuture cf; + private final AtomicReference errorRef = new AtomicReference<>(); + private final AtomicLong demanded = new AtomicLong(); + private volatile boolean completed; + private volatile boolean cancelled; + + private final char[] chars = new char[1024]; + private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); + private final CharBuffer buffer = CharBuffer.wrap(chars); + private final StringBuilder builder = new StringBuilder(); + private int lineCount; + private String nextLine; + + private LineSubscription(Flow.Subscription s, + CharsetDecoder dec, + String separator, + Flow.Subscriber subscriber, + CompletableFuture completion) { + downstreamDemand = new Demand(); + queue = new ConcurrentLinkedDeque<>(); + upstreamSubscription = Objects.requireNonNull(s); + decoder = Objects.requireNonNull(dec); + newline = separator; + upstream = Objects.requireNonNull(subscriber); + cf = Objects.requireNonNull(completion); + scheduler = SequentialScheduler.synchronizedScheduler(this::loop); + } + + @Override + public void request(long n) { + if (cancelled) return; + if (downstreamDemand.increase(n)) { + scheduler.runOrSchedule(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstreamSubscription.cancel(); + } + + public void submit(List list) { + queue.addAll(list); + demanded.decrementAndGet(); + scheduler.runOrSchedule(); + } + + public void signalComplete() { + completed = true; + scheduler.runOrSchedule(); + } + + public void signalError(Throwable error) { + if (errorRef.compareAndSet(null, + Objects.requireNonNull(error))) { + scheduler.runOrSchedule(); + } + } + + // This method looks at whether some bytes where left over (in leftover) + // from decoding the previous buffer when the previous buffer was in + // underflow. If so, it takes bytes one by one from the new buffer 'in' + // and combines them with the leftover bytes until 'in' is exhausted or a + // character was produced in 'out', resolving the previous underflow. + // Returns true if the buffer is still in underflow, false otherwise. + // However, in both situation some chars might have been produced in 'out'. + private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) + throws CharacterCodingException { + int limit = leftover.position(); + if (limit == 0) { + // no leftover + return false; + } else { + CoderResult res = null; + while (in.hasRemaining()) { + leftover.position(limit); + leftover.limit(++limit); + leftover.put(in.get()); + leftover.position(0); + res = decoder.decode(leftover, out, + endOfInput && !in.hasRemaining()); + int remaining = leftover.remaining(); + if (remaining > 0) { + assert leftover.position() == 0; + leftover.position(remaining); + } else { + leftover.position(0); + } + leftover.limit(leftover.capacity()); + if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { + continue; + } + if (res.isError()) { + res.throwException(); + } + assert !res.isOverflow(); + return false; + } + return !endOfInput; + } + } + + // extract characters from start to end and remove them from + // the StringBuilder + private static String take(StringBuilder b, int start, int end) { + assert start == 0; + String line; + if (end == start) return ""; + line = b.substring(start, end); + b.delete(start, end); + return line; + } + + // finds end of line, returns -1 if not found, or the position after + // the line delimiter if found, removing the delimiter in the process. + private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { + int len = b.length(); + if (eol != null) { // delimiter explicitly specified + int i = b.indexOf(eol); + if (i >= 0) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(i, i + eol.length()); + return i; + } + } else { // no delimiter specified, behaves as BufferedReader::readLine + boolean crfound = false; + for (int i = 0; i < len; i++) { + char c = b.charAt(i); + if (c == '\n') { + // '\n' or '\r\n' found. + // remove the delimiter and returns the position + // of the char after it. + b.delete(crfound ? i - 1 : i, i + 1); + return crfound ? i - 1 : i; + } else if (crfound) { + // previous char was '\r', c != '\n' + assert i != 0; + // remove the delimiter and returns the position + // of the char after it. + b.delete(i - 1, i); + return i - 1; + } + crfound = c == '\r'; + } + if (crfound && endOfInput) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(len - 1, len); + return len - 1; + } + } + return endOfInput && len > 0 ? len : -1; + } + + // Looks at whether the StringBuilder contains a line. + // Returns null if more character are needed. + private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { + int next = endOfLine(b, eol, endOfInput); + return (next > -1) ? take(b, 0, next) : null; + } + + // Attempts to read the next line. Returns the next line if + // the delimiter was found, null otherwise. The delimiters are + // consumed. + private String nextLine() + throws CharacterCodingException { + assert nextLine == null; + LINES: + while (nextLine == null) { + boolean endOfInput = completed && queue.isEmpty(); + nextLine = nextLine(builder, newline, + endOfInput && leftover.position() == 0); + if (nextLine != null) return nextLine; + ByteBuffer b; + BUFFERS: + while ((b = queue.peek()) != null) { + if (!b.hasRemaining()) { + queue.poll(); + continue BUFFERS; + } + BYTES: + while (b.hasRemaining()) { + buffer.position(0); + buffer.limit(buffer.capacity()); + boolean endofInput = completed && queue.size() <= 1; + if (isUnderFlow(b, buffer, endofInput)) { + assert !b.hasRemaining(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + continue BUFFERS; + } + CoderResult res = decoder.decode(b, buffer, endofInput); + if (res.isError()) res.throwException(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + continue LINES; + } + if (res.isUnderflow() && b.hasRemaining()) { + //System.out.println("underflow: adding " + b.remaining() + " bytes"); + leftover.put(b); + assert !b.hasRemaining(); + continue BUFFERS; + } + } + } + + assert queue.isEmpty(); + if (endOfInput) { + // Time to cleanup: there may be some undecoded leftover bytes + // We need to flush them out. + // The decoder has been configured to replace malformed/unmappable + // chars with some replacement, in order to behave like + // InputStreamReader. + leftover.flip(); + buffer.position(0); + buffer.limit(buffer.capacity()); + + // decode() must be called just before flush, even if there + // is nothing to decode. We must do this even if leftover + // has no remaining bytes. + CoderResult res = decoder.decode(leftover, buffer, endOfInput); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // Now call decoder.flush() + buffer.position(0); + buffer.limit(buffer.capacity()); + res = decoder.flush(buffer); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // It's possible that we reach here twice - just for the + // purpose of checking that no bytes were left over, so + // we reset leftover/decoder to make the function reentrant. + leftover.position(0); + leftover.limit(leftover.capacity()); + decoder.reset(); + + // if some chars were produced then this call will + // return them. + return nextLine = nextLine(builder, newline, endOfInput); + } + return null; + } + return null; + } + + // The main sequential scheduler loop. + private void loop() { + try { + while (!cancelled) { + Throwable error = errorRef.get(); + if (error != null) { + cancelled = true; + scheduler.stop(); + upstream.onError(error); + cf.completeExceptionally(error); + return; + } + if (nextLine == null) nextLine = nextLine(); + if (nextLine == null) { + if (completed) { + scheduler.stop(); + if (leftover.position() != 0) { + // Underflow: not all bytes could be + // decoded, but no more bytes will be coming. + // This should not happen as we should already + // have got a MalformedInputException, or + // replaced the unmappable chars. + errorRef.compareAndSet(null, + new IllegalStateException( + "premature end of input (" + + leftover.position() + + " undecoded bytes)")); + continue; + } else { + upstream.onComplete(); + } + return; + } else if (demanded.get() == 0 + && !downstreamDemand.isFulfilled()) { + long incr = Math.max(1, downstreamDemand.get()); + demanded.addAndGet(incr); + upstreamSubscription.request(incr); + continue; + } else return; + } + assert nextLine != null; + assert newline != null && !nextLine.endsWith(newline) + || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); + if (downstreamDemand.tryDecrement()) { + String forward = nextLine; + nextLine = null; + upstream.onNext(forward); + } else return; // no demand: come back later + } + } catch (Throwable t) { + try { + upstreamSubscription.cancel(); + } finally { + signalError(t); + } + } + } + + static LineSubscription create(Flow.Subscription s, + Charset charset, + String lineSeparator, + Flow.Subscriber upstream, + CompletableFuture cf) { + return new LineSubscription(Objects.requireNonNull(s), + Objects.requireNonNull(charset).newDecoder() + // use the same decoder configuration than + // java.io.InputStreamReader + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE), + lineSeparator, + Objects.requireNonNull(upstream), + Objects.requireNonNull(cf)); + } + } +} + diff -r bbd688c6fbbb -r cf8792f51dee src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 05 14:11:48 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 12 15:36:28 2018 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,11 +25,15 @@ package jdk.incubator.http; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.OpenOption; import java.nio.file.Path; import java.security.AccessControlContext; @@ -52,6 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils; @@ -461,6 +466,58 @@ } + /** + * A {@code Stream} built on top of the Flow API. + */ + static final class HttpLineStream implements HttpResponse.BodySubscriber> { + + private final HttpResponseInputStream responseInputStream; + private final Charset charset; + private HttpLineStream(Charset charset) { + this.charset = Objects.requireNonNull(charset); + responseInputStream = new HttpResponseInputStream(); + } + + @Override + public CompletionStage> getBody() { + return responseInputStream.getBody().thenApply((is) -> + new BufferedReader(new InputStreamReader(is, charset)) + .lines().onClose(this::close)); + } + + @Override + public void onSubscribe(Subscription subscription) { + responseInputStream.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + responseInputStream.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + responseInputStream.onError(throwable); + } + + @Override + public void onComplete() { + responseInputStream.onComplete(); + } + + void close() { + try { + responseInputStream.close(); + } catch (IOException x) { + // ignore + } + } + + static HttpLineStream create(Charset charset) { + return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8)); + } + } + static class MultiSubscriberImpl implements HttpResponse.MultiSubscriber,V> { diff -r bbd688c6fbbb -r cf8792f51dee src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java Fri Jan 05 14:11:48 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Utils.java Fri Jan 12 15:36:28 2018 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -28,6 +28,7 @@ import jdk.incubator.http.HttpHeaders; import sun.net.NetProperties; import sun.net.util.IPAddressUtil; +import sun.net.www.HeaderParser; import javax.net.ssl.SSLParameters; import java.io.ByteArrayOutputStream; @@ -476,11 +477,17 @@ * UTF_8 */ public static Charset charsetFrom(HttpHeaders headers) { - String encoding = headers.firstValue("Content-encoding") - .orElse("UTF_8"); + String type = headers.firstValue("Content-type") + .orElse("text/html; charset=utf-8"); + int i = type.indexOf(";"); + if (i >= 0) type = type.substring(i+1); try { - return Charset.forName(encoding); - } catch (IllegalArgumentException e) { + HeaderParser parser = new HeaderParser(type); + String value = parser.findValue("charset"); + if (value == null) return StandardCharsets.UTF_8; + return Charset.forName(value); + } catch (Throwable x) { + Log.logTrace("Can't find charset in \"{0}\" ({1})", type, x); return StandardCharsets.UTF_8; } } diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/LineAdaptersCompileOnly.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/LineAdaptersCompileOnly.java Fri Jan 12 15:36:28 2018 +0000 @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Flow; +import java.util.function.Function; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodySubscriber; + +/* + * @test + * @summary Basic test for Flow adapters with generic type parameters + * @compile LineAdaptersCompileOnly.java + */ + +public class LineAdaptersCompileOnly { + + public static void main(String[] args) { + makesSureDifferentGenericSignaturesCompile(); + } + + static void makesSureDifferentGenericSignaturesCompile() { + + BodyHandler.fromLineSubscriber(new StringSubscriber()); + BodyHandler.fromLineSubscriber(new CharSequenceSubscriber()); + BodyHandler.fromLineSubscriber(new ObjectSubscriber()); + + + BodySubscriber.fromLineSubscriber(new StringSubscriber()); + BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber()); + BodySubscriber.fromLineSubscriber(new ObjectSubscriber()); + + + BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), "\n"); + BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), "\r\n"); + BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), "\n"); + BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null); + BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), null); + BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), null); + + BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + StandardCharsets.UTF_8, "\n"); + BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), + StandardCharsets.UTF_16, "\r\n"); + BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), + StandardCharsets.US_ASCII, "\n"); + BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + StandardCharsets.UTF_8, null); + BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), + StandardCharsets.UTF_16, null); + BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), + StandardCharsets.US_ASCII, null); + } + + static class StringSubscriber implements Flow.Subscriber { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(String item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } + + static class CharSequenceSubscriber implements Flow.Subscriber { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(CharSequence item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } + + static class ObjectSubscriber implements Flow.Subscriber { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(Object item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } +} diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/LineBodyHandlerTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/LineBodyHandlerTest.java Fri Jan 12 15:36:28 2018 +0000 @@ -0,0 +1,702 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.StringReader; +import java.io.UncheckedIOException; +import java.math.BigInteger; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import jdk.incubator.http.HttpClient; +import jdk.incubator.http.HttpRequest; +import jdk.incubator.http.HttpResponse; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodySubscriber; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import javax.net.ssl.SSLContext; + +import static java.nio.charset.StandardCharsets.UTF_16; +import static java.nio.charset.StandardCharsets.UTF_8; +import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNotNull; + +/* + * @test + * @summary Basic tests for line adapter subscribers as created by + * the BodyHandlers returned by BodyHandler::fromLineSubscriber + * and BodyHandler::asLines + * @modules java.base/sun.net.www.http + * jdk.incubator.httpclient/jdk.incubator.http.internal.common + * jdk.incubator.httpclient/jdk.incubator.http.internal.frame + * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack + * java.logging + * jdk.httpserver + * @library /lib/testlibrary http2/server + * @build Http2TestServer + * @build jdk.testlibrary.SimpleSSLContext + * @run testng/othervm LineBodyHandlerTest + */ + +public class LineBodyHandlerTest { + + SSLContext sslContext; + HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] + HttpsServer httpsTestServer; // HTTPS/1.1 + Http2TestServer http2TestServer; // HTTP/2 ( h2c ) + Http2TestServer https2TestServer; // HTTP/2 ( h2 ) + String httpURI; + String httpsURI; + String http2URI; + String https2URI; + + @DataProvider(name = "uris") + public Object[][] variants() { + return new Object[][]{ + { httpURI }, + { httpsURI }, + { http2URI }, + { https2URI }, + }; + } + + static final Class NPE = NullPointerException.class; + static final Class IAE = IllegalArgumentException.class; + + @Test + public void testNull() { + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null)); + assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber())); + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), "\n")); + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, "\n")); + assertNotNull(BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(), null)); + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, null, "\n")); + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(null, Function.identity(), null)); + assertThrows(NPE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), null, null)); + + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(), + Charset.defaultCharset(), System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, + Charset.defaultCharset(), System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + null, System.lineSeparator())); + assertNotNull(BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + Charset.defaultCharset(), null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, + Charset.defaultCharset(), System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(), + null, System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(), + Charset.defaultCharset(), null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, + null, System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, + Charset.defaultCharset(), null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + null, null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), null, null, null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, Function.identity(), + null, null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, + Charset.defaultCharset(), null)); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, + null, System.lineSeparator())); + assertThrows(NPE, () -> BodySubscriber.fromLineSubscriber(null, null, null, null)); + } + + @Test + public void testIAE() { + assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new StringSubscriber(), Function.identity(),"")); + assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(),"")); + assertThrows(IAE, () -> BodyHandler.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), "")); + assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new StringSubscriber(), Function.identity(), + StandardCharsets.UTF_8, "")); + assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new CharSequenceSubscriber(), Function.identity(), + StandardCharsets.UTF_16, "")); + assertThrows(IAE, () -> BodySubscriber.fromLineSubscriber(new ObjectSubscriber(), Function.identity(), + StandardCharsets.US_ASCII, "")); + } + + private static final List lines(String text, String eol) { + if (eol == null) { + return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList()); + } else { + String replaced = text.replace(eol, "|"); + int i=0; + while(replaced.endsWith("||")) { + replaced = replaced.substring(0,replaced.length()-1); + i++; + } + List res = List.of(replaced.split("\\|")); + if (i > 0) { + res = new ArrayList<>(res); + for (int j=0; j response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\n")) + .join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, body); + assertEquals(subscriber.list, lines(body, "\n")); + } + + @Test(dataProvider = "uris") + void testAsStream(String url) { + String body = "May the luck of the Irish be with you!"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, body); + assertEquals(list, List.of(body)); + assertEquals(list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testStringWithFinisher2(String url) { + String body = "May the luck\r\n\r\n of the Irish be with you!"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + StringSubscriber subscriber = new StringSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, body.replace("\r\n", "\n")); + assertEquals(subscriber.list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testAsStreamWithCRLF(String url) { + String body = "May the luck\r\n\r\n of the Irish be with you!"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck|| of the Irish be with you!"); + assertEquals(list, List.of("May the luck", + "", + " of the Irish be with you!")); + assertEquals(list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testStringWithFinisherBlocking(String url) throws Exception { + String body = "May the luck of the Irish be with you!"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)).build(); + + StringSubscriber subscriber = new StringSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromLineSubscriber(subscriber, Supplier::get, "\n")); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + assertEquals(subscriber.list, lines(body, "\n")); + } + + @Test(dataProvider = "uris") + void testStringWithoutFinisherBlocking(String url) throws Exception { + String body = "May the luck of the Irish be with you!"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)).build(); + + StringSubscriber subscriber = new StringSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromLineSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + assertEquals(subscriber.list, lines(body, null)); + } + + // Subscriber + + @Test(dataProvider = "uris") + void testAsStreamWithMixedCRLF(String url) { + String body = "May\r\n the wind\r\n always be\rat your back.\r\r"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May| the wind| always be|at your back.|"); + assertEquals(list, List.of("May", + " the wind", + " always be", + "at your back.", + "")); + assertEquals(list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testAsStreamWithMixedCRLF_UTF8(String url) { + String body = "May\r\n the wind\r\n always be\rat your back.\r\r"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .header("Content-type", "text/text; charset=UTF-8") + .POST(fromString(body, UTF_8)).build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May| the wind| always be|at your back.|"); + assertEquals(list, List.of("May", + " the wind", + " always be", + "at your back.", "")); + assertEquals(list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testAsStreamWithMixedCRLF_UTF16(String url) { + String body = "May\r\n the wind\r\n always be\rat your back.\r\r"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .header("Content-type", "text/text; charset=UTF-16") + .POST(fromString(body, UTF_16)).build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May| the wind| always be|at your back.|"); + assertEquals(list, List.of("May", + " the wind", + " always be", + "at your back.", + "")); + assertEquals(list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testObjectWithFinisher(String url) { + String body = "May\r\n the wind\r\n always be\rat your back."; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber, ObjectSubscriber::get, "\r\n")) + .join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May\n the wind\n always be\rat your back."); + assertEquals(subscriber.list, List.of("May", + " the wind", + " always be\rat your back.")); + assertEquals(subscriber.list, lines(body, "\r\n")); + } + + @Test(dataProvider = "uris") + void testObjectWithFinisher_UTF16(String url) { + String body = "May\r\n the wind\r\n always be\rat your back.\r\r"; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .header("Content-type", "text/text; charset=UTF-16") + .POST(fromString(body, UTF_16)).build(); + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber, + ObjectSubscriber::get, + null)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May\n the wind\n always be\nat your back.\n"); + assertEquals(subscriber.list, List.of("May", + " the wind", + " always be", + "at your back.", + "")); + assertEquals(subscriber.list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testObjectWithoutFinisher(String url) { + String body = "May\r\n the wind\r\n always be\rat your back."; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May\n the wind\n always be\nat your back."); + assertEquals(subscriber.list, List.of("May", + " the wind", + " always be", + "at your back.")); + assertEquals(subscriber.list, lines(body, null)); + } + + @Test(dataProvider = "uris") + void testObjectWithFinisherBlocking(String url) throws Exception { + String body = "May\r\n the wind\r\n always be\nat your back."; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromLineSubscriber(subscriber, + ObjectSubscriber::get, + "\r\n")); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May\n the wind\n always be\nat your back."); + assertEquals(subscriber.list, List.of("May", + " the wind", + " always be\nat your back.")); + assertEquals(subscriber.list, lines(body, "\r\n")); + } + + @Test(dataProvider = "uris") + void testObjectWithoutFinisherBlocking(String url) throws Exception { + String body = "May\r\n the wind\r\n always be\nat your back."; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(body)) + .build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromLineSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + assertEquals(text, "May\n the wind\n always be\nat your back."); + assertEquals(subscriber.list, List.of("May", + " the wind", + " always be", + "at your back.")); + assertEquals(subscriber.list, lines(body, null)); + } + + static private final String LINE = "Bient\u00f4t nous plongerons dans les" + + " fr\u00f4\ud801\udc00des t\u00e9n\u00e8bres, "; + + static private final String bigtext() { + StringBuilder res = new StringBuilder((LINE.length() + 1) * 50); + for (int i = 0; i<50; i++) { + res.append(LINE); + if (i%2 == 0) res.append("\r\n"); + } + return res.toString(); + } + + @Test(dataProvider = "uris") + void testBigTextFromLineSubscriber(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + String bigtext = bigtext(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(bigtext)) + .build(); + + StringSubscriber subscriber = new StringSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromLineSubscriber(subscriber, Supplier::get,"\r\n")) + .join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, bigtext.replace("\r\n", "\n")); + assertEquals(subscriber.list, lines(bigtext, "\r\n")); + } + + @Test(dataProvider = "uris") + void testBigTextAsStream(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext) + .build(); + String bigtext = bigtext(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString(bigtext)) + .build(); + + HttpResponse> response = client.sendAsync(request, + BodyHandler.asLines()).join(); + Stream stream = response.body(); + List list = stream.collect(Collectors.toList()); + String text = list.stream().collect(Collectors.joining("|")); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, bigtext.replace("\r\n", "|")); + assertEquals(list, List.of(bigtext.split("\r\n"))); + assertEquals(list, lines(bigtext, null)); + } + + /** An abstract Subscriber that converts all received data into a String. */ + static abstract class AbstractSubscriber implements Supplier { + protected volatile Flow.Subscription subscription; + protected final StringBuilder baos = new StringBuilder(); + protected volatile String text; + protected volatile RuntimeException error; + protected final List list = new CopyOnWriteArrayList<>(); + + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + public void onError(Throwable throwable) { + System.out.println(this + " onError: " + throwable); + error = new RuntimeException(throwable); + } + public void onComplete() { + System.out.println(this + " onComplete"); + text = baos.toString(); + } + @Override public String get() { + if (error != null) throw error; + return text; + } + } + + static class StringSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(String item) { + System.out.print(this + " onNext: \"" + item + "\""); + if (baos.length() != 0) baos.append('\n'); + baos.append(item); + list.add(item); + } + } + + static class CharSequenceSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(CharSequence item) { + System.out.print(this + " onNext: " + item); + if (baos.length() != 0) baos.append('\n'); + baos.append(item); + list.add(item); + } + } + + static class ObjectSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(Object item) { + System.out.print(this + " onNext: " + item); + if (baos.length() != 0) baos.append('\n'); + baos.append(item); + list.add(item); + } + } + + + static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) { + try { + baos.write(ba); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + InetSocketAddress sa = new InetSocketAddress("localhost", 0); + httpTestServer = HttpServer.create(sa, 0); + httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); + httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; + + httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); + httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; + + http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); + int port = http2TestServer.getAddress().getPort(); + http2URI = "http://127.0.0.1:" + port + "/http2/echo"; + + https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); + port = https2TestServer.getAddress().getPort(); + https2URI = "https://127.0.0.1:" + port + "/https2/echo"; + + httpTestServer.start(); + httpsTestServer.start(); + http2TestServer.start(); + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + httpTestServer.stop(0); + httpsTestServer.stop(0); + http2TestServer.stop(); + https2TestServer.stop(); + } + + static void printBytes(PrintStream out, String prefix, byte[] bytes) { + int padding = 4 + 4 - (bytes.length % 4); + padding = padding > 4 ? padding - 4 : 4; + byte[] bigbytes = new byte[bytes.length + padding]; + System.arraycopy(bytes, 0, bigbytes, padding, bytes.length); + out.println(prefix + bytes.length + " " + + new BigInteger(bigbytes).toString(16)); + } + + static class Http1EchoHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + printBytes(System.out,"Bytes: ", bytes); + if (t.getRequestHeaders().containsKey("Content-type")) { + t.getResponseHeaders().add("Content-type", + t.getRequestHeaders().getFirst("Content-type")); + } + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } + + static class Http2EchoHandler implements Http2Handler { + @Override + public void handle(Http2TestExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + printBytes(System.out,"Bytes: ", bytes); + if (t.getRequestHeaders().firstValue("Content-type").isPresent()) { + t.getResponseHeaders().addHeader("Content-type", + t.getRequestHeaders().firstValue("Content-type").get()); + } + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } +} diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/LineStreamsAndSurrogatesTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/LineStreamsAndSurrogatesTest.java Fri Jan 12 15:36:28 2018 +0000 @@ -0,0 +1,319 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import jdk.incubator.http.HttpResponse.BodySubscriber; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.MalformedInputException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.charset.StandardCharsets.UTF_16; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +/* + * @test + * @summary tests for BodySubscribers returned by asLines. + * In particular tests that surrogate characters are handled + * correctly. + * @modules jdk.incubator.httpclient java.logging + * @run testng/othervm LineStreamsAndSurrogatesTest + */ + +public class LineStreamsAndSurrogatesTest { + + + static final Class NPE = NullPointerException.class; + + private static final List lines(String text) { + return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList()); + } + + @Test + void testUncomplete() throws Exception { + // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r les\n\n" + + " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00"; + Charset charset = UTF_8; + + BodySubscriber> bodySubscriber = + BodySubscriber.asLines(charset); + AtomicReference errorRef = new AtomicReference<>(); + Runnable run = () -> { + try { + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] sbytes = text.getBytes(charset); + byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + } catch(Throwable t) { + errorRef.set(t); + } + }; + Thread thread = new Thread(run,"Publishing"); + thread.start(); + try { + Stream stream = bodySubscriber.getBody().toCompletableFuture().get(); + List list = stream.collect(Collectors.toList()); + String resp = list.stream().collect(Collectors.joining("")); + System.out.println("***** Got: " + resp); + + byte[] sbytes = text.getBytes(UTF_8); + byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + BufferedReader reader = new BufferedReader(new InputStreamReader(bais, charset)); + String resp2 = reader.lines().collect(Collectors.joining("")); + System.out.println("***** Got2: " + resp2); + + assertEquals(resp, resp2); + assertEquals(list, List.of("Bient\u00f4t", + " nous plongerons", + " dans", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres\ufffd")); + } catch (ExecutionException x) { + Throwable cause = x.getCause(); + if (cause instanceof MalformedInputException) { + throw new RuntimeException("Unexpected MalformedInputException", cause); + } + throw x; + } + if (errorRef.get() != null) { + throw new RuntimeException("Unexpected exception", errorRef.get()); + } + } + + @Test + void testStream1() throws Exception { + // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r les\n\n" + + " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres"; + Charset charset = UTF_8; + + BodySubscriber> bodySubscriber = + BodySubscriber.asLines(charset); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(charset); + AtomicReference errorRef = new AtomicReference<>(); + Runnable run = () -> { + try { + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + } catch(Throwable t) { + errorRef.set(t); + } + }; + + Stream stream = bodySubscriber.getBody().toCompletableFuture().get(); + Thread thread = new Thread(run,"Publishing"); + thread.start(); + List list = stream.collect(Collectors.toList()); + String resp = list.stream().collect(Collectors.joining("|")); + System.out.println("***** Got: " + resp); + assertEquals(resp, text.replace("\r\n", "|") + .replace("\n","|") + .replace("\r","|")); + assertEquals(list, List.of("Bient\u00f4t", + " nous plongerons", + " dans", + "", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres")); + assertEquals(list, lines(text)); + if (errorRef.get() != null) { + throw new RuntimeException("Unexpected exception", errorRef.get()); + } + } + + + @Test + void testStream2() throws Exception { + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + + " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; + Charset charset = UTF_8; + + BodySubscriber> bodySubscriber = BodySubscriber.asLines(charset); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(charset); + AtomicReference errorRef = new AtomicReference<>(); + Runnable run = () -> { + try { + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + } catch(Throwable t) { + errorRef.set(t); + } + }; + + Stream stream = bodySubscriber.getBody().toCompletableFuture().get(); + Thread thread = new Thread(run,"Publishing"); + thread.start(); + List list = stream.collect(Collectors.toList()); + String resp = list.stream().collect(Collectors.joining("")); + System.out.println("***** Got: " + resp); + String expected = Stream.of(text.split("\r\n|\r|\n")) + .collect(Collectors.joining("")); + assertEquals(resp, expected); + assertEquals(list, List.of("Bient\u00f4t", + " nous plongerons", + " dans", + "", + " les fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres", + "")); + assertEquals(list, lines(text)); + if (errorRef.get() != null) { + throw new RuntimeException("Unexpected exception", errorRef.get()); + } + } + + @Test + void testStream3_UTF16() throws Exception { + // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + + " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres"; + Charset charset = UTF_16; + + BodySubscriber> bodySubscriber = + BodySubscriber.asLines(charset); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(charset); + AtomicReference errorRef = new AtomicReference<>(); + Runnable run = () -> { + try { + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + } catch(Throwable t) { + errorRef.set(t); + } + }; + + Stream stream = bodySubscriber.getBody().toCompletableFuture().get(); + Thread thread = new Thread(run,"Publishing"); + thread.start(); + List list = stream.collect(Collectors.toList()); + String resp = list.stream().collect(Collectors.joining("")); + System.out.println("***** Got: " + resp); + assertEquals(resp, text.replace("\n","").replace("\r","")); + assertEquals(list, List.of("Bient\u00f4t", + " nous plongerons", + " dans", + "", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres")); + assertEquals(list, lines(text)); + if (errorRef.get() != null) { + throw new RuntimeException("Unexpected exception", errorRef.get()); + } + } + + + @Test + void testStream4_UTF16() throws Exception { + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + + " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; + Charset charset = UTF_16; + + BodySubscriber> bodySubscriber = + BodySubscriber.asLines(charset); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(charset); + AtomicReference errorRef = new AtomicReference<>(); + Runnable run = () -> { + try { + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + } catch(Throwable t) { + errorRef.set(t); + } + }; + + Stream stream = bodySubscriber.getBody().toCompletableFuture().get(); + Thread thread = new Thread(run,"Publishing"); + thread.start(); + List list = stream.collect(Collectors.toList()); + String resp = list.stream().collect(Collectors.joining("")); + System.out.println("***** Got: " + resp); + String expected = Stream.of(text.split("\r\n|\r|\n")) + .collect(Collectors.joining("")); + assertEquals(resp, expected); + assertEquals(list, List.of("Bient\u00f4t", + " nous plongerons", + " dans", + "", + " les fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres", + "")); + assertEquals(list, lines(text)); + if (errorRef.get() != null) { + throw new RuntimeException("Unexpected exception", errorRef.get()); + } + } + +} diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/LineSubscribersAndSurrogatesTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/LineSubscribersAndSurrogatesTest.java Fri Jan 12 15:36:28 2018 +0000 @@ -0,0 +1,384 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import jdk.incubator.http.HttpResponse.BodySubscriber; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.MalformedInputException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.charset.StandardCharsets.UTF_16; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +/* + * @test + * @summary tests for BodySubscribers returned by fromLineSubscriber. + * In particular tests that surrogate characters are handled + * correctly. + * @modules jdk.incubator.httpclient java.logging + * @run testng/othervm LineSubscribersAndSurrogatesTest + */ + +public class LineSubscribersAndSurrogatesTest { + + + static final Class NPE = NullPointerException.class; + + private static final List lines(String text, String eol) { + if (eol == null) { + return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList()); + } else { + String replaced = text.replace(eol, "|"); + int i=0; + while(replaced.endsWith("||")) { + replaced = replaced.substring(0,replaced.length()-1); + i++; + } + List res = List.of(replaced.split("\\|")); + if (i > 0) { + res = new ArrayList<>(res); + for (int j=0; j bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_8, null); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] sbytes = text.getBytes(UTF_8); + byte[] bytes = Arrays.copyOfRange(sbytes,0, sbytes.length - 1); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_8, "\n"); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_8); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i expected = List.of("Bient\u00f4t\r", + " nous plongerons\r", + " dans\r les", + "", + " fr\u00f4\ud801\udc00des\r", + " t\u00e9n\u00e8bres\r"); + assertEquals(subscriber.list, expected); + assertEquals(resp, Stream.of(text.split("\n")).collect(Collectors.joining(""))); + assertEquals(resp, expected.stream().collect(Collectors.joining(""))); + assertEquals(subscriber.list, lines(text, "\n")); + } + + + @Test + void testStringWithFinisherCR() throws Exception { + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" + + " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; + ObjectSubscriber subscriber = new ObjectSubscriber(); + BodySubscriber bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_8, "\r"); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_8); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_8, "\r\n"); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_8); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_8, null); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_8); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i expected = List.of("Bient\u00f4t", + " nous plongerons", + " dans", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres"); + assertEquals(subscriber.list, expected); + assertEquals(resp, expected.stream().collect(Collectors.joining(""))); + assertEquals(subscriber.list, lines(text, null)); + } + + @Test + void testStringWithFinisherBR_UTF_16() throws Exception { + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" + + " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; + ObjectSubscriber subscriber = new ObjectSubscriber(); + BodySubscriber bodySubscriber = BodySubscriber.fromLineSubscriber( + subscriber, Supplier::get, UTF_16, null); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_16); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i=0; i expected = List.of("Bient\u00f4t", + " nous plongerons", + " dans", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres", + ""); + assertEquals(resp, expected.stream().collect(Collectors.joining(""))); + assertEquals(subscriber.list, expected); + assertEquals(subscriber.list, lines(text, null)); + } + + void testStringWithoutFinisherBR() throws Exception { + String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r" + + " les\r\r fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres"; + ObjectSubscriber subscriber = new ObjectSubscriber(); + BodySubscriber bodySubscriber = BodySubscriber.fromLineSubscriber(subscriber); + SubmissionPublisher> publisher = new SubmissionPublisher<>(); + byte[] bytes = text.getBytes(UTF_8); + publisher.subscribe(bodySubscriber); + System.out.println("Publishing " + bytes.length + " bytes"); + for (int i = 0; i < bytes.length; i++) { + // ensure that surrogates are split over several buffers. + publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); + } + publisher.close(); + Void resp = bodySubscriber.getBody().toCompletableFuture().get(); + System.out.println("***** Got: " + resp); + List expected = List.of("Bient\u00f4t", + " nous plongerons", + " dans", + " les", + "", + " fr\u00f4\ud801\udc00des", + " t\u00e9n\u00e8bres"); + assertEquals(subscriber.text, expected.stream().collect(Collectors.joining(""))); + assertEquals(subscriber.list, expected); + assertEquals(subscriber.list, lines(text, null)); + } + + + /** An abstract Subscriber that converts all received data into a String. */ + static abstract class AbstractSubscriber implements Supplier { + protected final List list = new CopyOnWriteArrayList<>(); + protected volatile Flow.Subscription subscription; + protected final StringBuilder baos = new StringBuilder(); + protected volatile String text; + protected volatile RuntimeException error; + + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + public void onError(Throwable throwable) { + System.out.println(this + " onError: " + throwable); + error = new RuntimeException(throwable); + } + public void onComplete() { + System.out.println(this + " onComplete"); + text = baos.toString(); + } + @Override public String get() { + if (error != null) throw error; + return text; + } + public final List list() { + return list; + } + } + + static class StringSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(String item) { + System.out.println(this + " onNext: \"" + + item.replace("\n","\\n") + .replace("\r", "\\r") + + "\""); + baos.append(item); + list.add(item); + } + } + + static class CharSequenceSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(CharSequence item) { + System.out.println(this + " onNext: \"" + + item.toString().replace("\n","\\n") + .replace("\r", "\\r") + + "\""); + baos.append(item); + list.add(item); + } + } + + static class ObjectSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(Object item) { + System.out.println(this + " onNext: \"" + + item.toString().replace("\n","\\n") + .replace("\r", "\\r") + + "\""); + baos.append(item); + list.add(item); + } + } + + + static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) { + try { + baos.write(ba); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/http2/server/BodyInputStream.java --- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Jan 05 14:11:48 2018 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Jan 12 15:36:28 2018 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -122,7 +122,7 @@ if (c == -1) { return -1; } - return one[0]; + return one[0] & 0xFF; } @Override diff -r bbd688c6fbbb -r cf8792f51dee test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Jan 05 14:11:48 2018 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Jan 12 15:36:28 2018 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -269,9 +269,9 @@ } } - String doUpgrade() throws IOException { - String upgrade = readHttp1Request(); - String h2c = getHeader(upgrade, "Upgrade"); + Http1InitialRequest doUpgrade() throws IOException { + Http1InitialRequest upgrade = readHttp1Request(); + String h2c = getHeader(upgrade.headers, "Upgrade"); if (h2c == null || !h2c.equals("h2c")) { System.err.println("Server:HEADERS: " + upgrade); throw new IOException("Bad upgrade 1 " + h2c); @@ -283,7 +283,7 @@ sendSettingsFrame(); readPreface(); - String clientSettingsString = getHeader(upgrade, "HTTP2-Settings"); + String clientSettingsString = getHeader(upgrade.headers, "HTTP2-Settings"); clientSettings = getSettingsFromString(clientSettingsString); return upgrade; @@ -313,7 +313,7 @@ } void run() throws Exception { - String upgrade = null; + Http1InitialRequest upgrade = null; if (!secure) { upgrade = doUpgrade(); } else { @@ -462,9 +462,9 @@ // First stream (1) comes from a plaintext HTTP/1.1 request @SuppressWarnings({"rawtypes","unchecked"}) - void createPrimordialStream(String request) throws IOException { + void createPrimordialStream(Http1InitialRequest request) throws IOException { HttpHeadersImpl headers = new HttpHeadersImpl(); - String requestLine = getRequestLine(request); + String requestLine = getRequestLine(request.headers); String[] tokens = requestLine.split(" "); if (!tokens[2].equals("HTTP/1.1")) { throw new IOException("bad request line"); @@ -475,7 +475,7 @@ } catch (URISyntaxException e) { throw new IOException(e); } - String host = getHeader(request, "Host"); + String host = getHeader(request.headers, "Host"); if (host == null) { throw new IOException("missing Host"); } @@ -485,9 +485,9 @@ headers.setHeader(":authority", host); headers.setHeader(":path", uri.getPath()); Queue q = new Queue(sentinel); - String body = getRequestBody(request); - addHeaders(getHeaders(request), headers); - headers.setHeader("Content-length", Integer.toString(body.length())); + byte[] body = getRequestBody(request); + addHeaders(getHeaders(request.headers), headers); + headers.setHeader("Content-length", Integer.toString(body.length)); addRequestBodyToQueue(body, q); streams.put(1, q); @@ -885,7 +885,16 @@ final static String CRLF = "\r\n"; final static String CRLFCRLF = "\r\n\r\n"; - String readHttp1Request() throws IOException { + static class Http1InitialRequest { + final String headers; + final byte[] body; + Http1InitialRequest(String headers, byte[] body) { + this.headers = headers; + this.body = body.clone(); + } + } + + Http1InitialRequest readHttp1Request() throws IOException { String headers = readUntil(CRLF + CRLF); int clen = getContentLength(headers); String te = getHeader(headers, "Transfer-encoding"); @@ -899,8 +908,7 @@ // HTTP/1.1 chunked data, read it buf = readChunkedInputStream(is); } - String body = new String(buf, StandardCharsets.US_ASCII); - return headers + body; + return new Http1InitialRequest(headers, buf); } catch (IOException e) { System.err.println("TestServer: headers read: [ " + headers + " ]"); throw e; @@ -943,19 +951,13 @@ // wrapper around a BlockingQueue that throws an exception when it's closed // Each stream has one of these - String getRequestBody(String request) { - int bodystart = request.indexOf(CRLF+CRLF); - String body; - if (bodystart == -1) - body = ""; - else - body = request.substring(bodystart+4); - return body; + byte[] getRequestBody(Http1InitialRequest request) { + return request.body; } @SuppressWarnings({"rawtypes","unchecked"}) - void addRequestBodyToQueue(String body, Queue q) throws IOException { - ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); + void addRequestBodyToQueue(byte[] body, Queue q) throws IOException { + ByteBuffer buf = ByteBuffer.wrap(body); DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); // only used for primordial stream q.put(df);