diff -r 1ad58e0cbf16 -r 7133f144981a jdk/test/java/net/httpclient/HttpInputStreamTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/net/httpclient/HttpInputStreamTest.java Fri Dec 09 11:35:02 2016 +0000 @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URI; +import jdk.incubator.http.HttpClient; +import jdk.incubator.http.HttpHeaders; +import jdk.incubator.http.HttpRequest; +import jdk.incubator.http.HttpResponse; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Locale; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.stream.Stream; + +/* + * @test + * @summary An example on how to read a response body with InputStream... + * @run main/othervm HttpInputStreamTest + * @author daniel fuchs + */ +public class HttpInputStreamTest { + + public static boolean DEBUG = Boolean.getBoolean("test.debug"); + + /** + * A simple HttpResponse.BodyHandler that creates a live + * InputStream to read the response body from the underlying ByteBuffer + * Flow. + * The InputStream is made immediately available for consumption, before + * the response body is fully received. + */ + public static class HttpInputStreamHandler + implements HttpResponse.BodyHandler { + + public static final int MAX_BUFFERS_IN_QUEUE = 1; + + private final int maxBuffers; + + public HttpInputStreamHandler() { + this(MAX_BUFFERS_IN_QUEUE); + } + + public HttpInputStreamHandler(int maxBuffers) { + this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; + } + + @Override + public synchronized HttpResponse.BodyProcessor + apply(int i, HttpHeaders hh) { + return new HttpResponseInputStream(maxBuffers); + } + + /** + * An InputStream built on top of the Flow API. + */ + private static class HttpResponseInputStream extends InputStream + implements HttpResponse.BodyProcessor { + + // An immutable ByteBuffer sentinel to mark that the last byte was received. + private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); + + // A queue of yet unprocessed ByteBuffers received from the flow API. + private final BlockingQueue buffers; + private volatile Flow.Subscription subscription; + private volatile boolean closed; + private volatile Throwable failed; + private volatile ByteBuffer current; + + HttpResponseInputStream() { + this(MAX_BUFFERS_IN_QUEUE); + } + + HttpResponseInputStream(int maxBuffers) { + int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; + this.buffers = new ArrayBlockingQueue<>(capacity); + } + + @Override + public CompletionStage getBody() { + // Return the stream immediately, before the + // response body is received. + // This makes it possible for senAsync().get().body() + // to complete before the response body is received. + return CompletableFuture.completedStage(this); + } + + // Returns the current byte buffer to read from. + // If the current buffer has no remaining data, will take the + // next buffer from the buffers queue, possibly blocking until + // a new buffer is made available through the Flow API, or the + // end of the flow is reached. + private ByteBuffer current() throws IOException { + while (current == null || !current.hasRemaining()) { + // Check whether the stream is claused or exhausted + if (closed || failed != null) { + throw new IOException("closed", failed); + } + if (current == LAST) break; + + try { + // Take a new buffer from the queue, blocking + // if none is available yet... + if (DEBUG) System.err.println("Taking Buffer"); + current = buffers.take(); + if (DEBUG) System.err.println("Buffer Taken"); + + // Check whether some exception was encountered + // upstream + if (closed || failed != null) { + throw new IOException("closed", failed); + } + + // Check whether we're done. + if (current == LAST) break; + + // Inform the producer that it can start sending + // us a new buffer + Flow.Subscription s = subscription; + if (s != null) s.request(1); + + } catch (InterruptedException ex) { + // continue + } + } + assert current == LAST || current.hasRemaining(); + return current; + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + // get the buffer to read from, possibly blocking if + // none is available + ByteBuffer buffer; + if ((buffer = current()) == LAST) return -1; + + // don't attempt to read more than what is available + // in the current buffer. + int read = Math.min(buffer.remaining(), len); + assert read > 0 && read <= buffer.remaining(); + + // buffer.get() will do the boundary check for us. + buffer.get(bytes, off, read); + return read; + } + + @Override + public int read() throws IOException { + ByteBuffer buffer; + if ((buffer = current()) == LAST) return -1; + return buffer.get() & 0xFF; + } + + @Override + public void onSubscribe(Flow.Subscription s) { + this.subscription = s; + s.request(Math.max(2, buffers.remainingCapacity() + 1)); + } + + @Override + public synchronized void onNext(ByteBuffer t) { + try { + if (DEBUG) System.err.println("next buffer received"); + buffers.put(t); + if (DEBUG) System.err.println("buffered offered"); + } catch (Exception ex) { + failed = ex; + try { + close(); + } catch (IOException ex1) { + // OK + } + } + } + + @Override + public void onError(Throwable thrwbl) { + failed = thrwbl; + } + + @Override + public synchronized void onComplete() { + subscription = null; + onNext(LAST); + } + + @Override + public void close() throws IOException { + synchronized (this) { + closed = true; + Flow.Subscription s = subscription; + if (s != null) { + s.cancel(); + } + subscription = null; + } + super.close(); + } + + } + } + + /** + * Examine the response headers to figure out the charset used to + * encode the body content. + * If the content type is not textual, returns an empty Optional. + * Otherwise, returns the body content's charset, defaulting to + * ISO-8859-1 if none is explicitly specified. + * @param headers The response headers. + * @return The charset to use for decoding the response body, if + * the response body content is text/... + */ + public static Optional getCharset(HttpHeaders headers) { + Optional contentType = headers.firstValue("Content-Type"); + Optional charset = Optional.empty(); + if (contentType.isPresent()) { + final String[] values = contentType.get().split(";"); + if (values[0].startsWith("text/")) { + charset = Optional.of(Stream.of(values) + .map(x -> x.toLowerCase(Locale.ROOT)) + .map(String::trim) + .filter(x -> x.startsWith("charset=")) + .map(x -> x.substring("charset=".length())) + .findFirst() + .orElse("ISO-8859-1")) + .map(Charset::forName); + } + } + return charset; + } + + public static void main(String[] args) throws Exception { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest + .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) + .GET() + .build(); + + // This example shows how to return an InputStream that can be used to + // start reading the response body before the response is fully received. + // In comparison, the snipet below (which uses + // HttpResponse.BodyHandler.asString()) obviously will not return before the + // response body is fully read: + // + // System.out.println( + // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); + + CompletableFuture> handle = + client.sendAsync(request, new HttpInputStreamHandler()); + if (DEBUG) System.err.println("Request sent"); + + HttpResponse pending = handle.get(); + + // At this point, the response headers have been received, but the + // response body may not have arrived yet. This comes from + // the implementation of HttpResponseInputStream::getBody above, + // which returns an already completed completion stage, without + // waiting for any data. + // We can therefore access the headers - and the body, which + // is our live InputStream, without waiting... + HttpHeaders responseHeaders = pending.headers(); + + // Get the charset declared in the response headers. + // The optional will be empty if the content type is not + // of type text/... + Optional charset = getCharset(responseHeaders); + + try (InputStream is = pending.body(); + // We assume a textual content type. Construct an InputStream + // Reader with the appropriate Charset. + // charset.get() will throw NPE if the content is not textual. + Reader r = new InputStreamReader(is, charset.get())) { + + char[] buff = new char[32]; + int off=0, n=0; + if (DEBUG) System.err.println("Start receiving response body"); + if (DEBUG) System.err.println("Charset: " + charset.get()); + + // Start consuming the InputStream as the data arrives. + // Will block until there is something to read... + while ((n = r.read(buff, off, buff.length - off)) > 0) { + assert (buff.length - off) > 0; + assert n <= (buff.length - off); + if (n == (buff.length - off)) { + System.out.print(buff); + off = 0; + } else { + off += n; + } + assert off < buff.length; + } + + // last call to read may not have filled 'buff' completely. + // flush out the remaining characters. + assert off >= 0 && off < buff.length; + for (int i=0; i < off; i++) { + System.out.print(buff[i]); + } + + // We're done! + System.out.println("Done!"); + } + } + +}