jdk/test/java/net/httpclient/HttpInputStreamTest.java
changeset 42460 7133f144981a
equal deleted inserted replaced
42459:1ad58e0cbf16 42460:7133f144981a
       
     1 /*
       
     2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 import java.io.IOException;
       
    25 import java.io.InputStream;
       
    26 import java.io.InputStreamReader;
       
    27 import java.io.Reader;
       
    28 import java.net.URI;
       
    29 import jdk.incubator.http.HttpClient;
       
    30 import jdk.incubator.http.HttpHeaders;
       
    31 import jdk.incubator.http.HttpRequest;
       
    32 import jdk.incubator.http.HttpResponse;
       
    33 import java.nio.ByteBuffer;
       
    34 import java.nio.charset.Charset;
       
    35 import java.util.Locale;
       
    36 import java.util.Optional;
       
    37 import java.util.concurrent.ArrayBlockingQueue;
       
    38 import java.util.concurrent.BlockingQueue;
       
    39 import java.util.concurrent.CompletableFuture;
       
    40 import java.util.concurrent.CompletionStage;
       
    41 import java.util.concurrent.Flow;
       
    42 import java.util.stream.Stream;
       
    43 
       
    44 /*
       
    45  * @test
       
    46  * @summary An example on how to read a response body with InputStream...
       
    47  * @run main/othervm HttpInputStreamTest
       
    48  * @author daniel fuchs
       
    49  */
       
    50 public class HttpInputStreamTest {
       
    51 
       
    52     public static boolean DEBUG = Boolean.getBoolean("test.debug");
       
    53 
       
    54     /**
       
    55      * A simple HttpResponse.BodyHandler that creates a live
       
    56      * InputStream to read the response body from the underlying ByteBuffer
       
    57      * Flow.
       
    58      * The InputStream is made immediately available for consumption, before
       
    59      * the response body is fully received.
       
    60      */
       
    61     public static class HttpInputStreamHandler
       
    62         implements HttpResponse.BodyHandler<InputStream>    {
       
    63 
       
    64         public static final int MAX_BUFFERS_IN_QUEUE = 1;
       
    65 
       
    66         private final int maxBuffers;
       
    67 
       
    68         public HttpInputStreamHandler() {
       
    69             this(MAX_BUFFERS_IN_QUEUE);
       
    70         }
       
    71 
       
    72         public HttpInputStreamHandler(int maxBuffers) {
       
    73             this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers;
       
    74         }
       
    75 
       
    76         @Override
       
    77         public synchronized HttpResponse.BodyProcessor<InputStream>
       
    78                 apply(int i, HttpHeaders hh) {
       
    79             return new HttpResponseInputStream(maxBuffers);
       
    80         }
       
    81 
       
    82         /**
       
    83          * An InputStream built on top of the Flow API.
       
    84          */
       
    85         private static class HttpResponseInputStream extends InputStream
       
    86                     implements HttpResponse.BodyProcessor<InputStream> {
       
    87 
       
    88             // An immutable ByteBuffer sentinel to mark that the last byte was received.
       
    89             private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]);
       
    90 
       
    91             // A queue of yet unprocessed ByteBuffers received from the flow API.
       
    92             private final BlockingQueue<ByteBuffer> buffers;
       
    93             private volatile Flow.Subscription subscription;
       
    94             private volatile boolean closed;
       
    95             private volatile Throwable failed;
       
    96             private volatile ByteBuffer current;
       
    97 
       
    98             HttpResponseInputStream() {
       
    99                 this(MAX_BUFFERS_IN_QUEUE);
       
   100             }
       
   101 
       
   102             HttpResponseInputStream(int maxBuffers) {
       
   103                 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers;
       
   104                 this.buffers = new ArrayBlockingQueue<>(capacity);
       
   105             }
       
   106 
       
   107             @Override
       
   108             public CompletionStage<InputStream> getBody() {
       
   109                 // Return the stream immediately, before the
       
   110                 // response body is received.
       
   111                 // This makes it possible for senAsync().get().body()
       
   112                 // to complete before the response body is received.
       
   113                 return CompletableFuture.completedStage(this);
       
   114             }
       
   115 
       
   116             // Returns the current byte buffer to read from.
       
   117             // If the current buffer has no remaining data, will take the
       
   118             // next buffer from the buffers queue, possibly blocking until
       
   119             // a new buffer is made available through the Flow API, or the
       
   120             // end of the flow is reached.
       
   121             private ByteBuffer current() throws IOException {
       
   122                 while (current == null || !current.hasRemaining()) {
       
   123                     // Check whether the stream is claused or exhausted
       
   124                     if (closed || failed != null) {
       
   125                         throw new IOException("closed", failed);
       
   126                     }
       
   127                     if (current == LAST) break;
       
   128 
       
   129                     try {
       
   130                         // Take a new buffer from the queue, blocking
       
   131                         // if none is available yet...
       
   132                         if (DEBUG) System.err.println("Taking Buffer");
       
   133                         current = buffers.take();
       
   134                         if (DEBUG) System.err.println("Buffer Taken");
       
   135 
       
   136                         // Check whether some exception was encountered
       
   137                         // upstream
       
   138                         if (closed || failed != null) {
       
   139                             throw new IOException("closed", failed);
       
   140                         }
       
   141 
       
   142                         // Check whether we're done.
       
   143                         if (current == LAST) break;
       
   144 
       
   145                         // Inform the producer that it can start sending
       
   146                         // us a new buffer
       
   147                         Flow.Subscription s = subscription;
       
   148                         if (s != null) s.request(1);
       
   149 
       
   150                     } catch (InterruptedException ex) {
       
   151                         // continue
       
   152                     }
       
   153                 }
       
   154                 assert current == LAST || current.hasRemaining();
       
   155                 return current;
       
   156             }
       
   157 
       
   158             @Override
       
   159             public int read(byte[] bytes, int off, int len) throws IOException {
       
   160                 // get the buffer to read from, possibly blocking if
       
   161                 // none is available
       
   162                 ByteBuffer buffer;
       
   163                 if ((buffer = current()) == LAST) return -1;
       
   164 
       
   165                 // don't attempt to read more than what is available
       
   166                 // in the current buffer.
       
   167                 int read = Math.min(buffer.remaining(), len);
       
   168                 assert read > 0 && read <= buffer.remaining();
       
   169 
       
   170                 // buffer.get() will do the boundary check for us.
       
   171                 buffer.get(bytes, off, read);
       
   172                 return read;
       
   173             }
       
   174 
       
   175             @Override
       
   176             public int read() throws IOException {
       
   177                 ByteBuffer buffer;
       
   178                 if ((buffer = current()) == LAST) return -1;
       
   179                 return buffer.get() & 0xFF;
       
   180             }
       
   181 
       
   182             @Override
       
   183             public void onSubscribe(Flow.Subscription s) {
       
   184                 this.subscription = s;
       
   185                 s.request(Math.max(2, buffers.remainingCapacity() + 1));
       
   186             }
       
   187 
       
   188             @Override
       
   189             public synchronized void onNext(ByteBuffer t) {
       
   190                 try {
       
   191                     if (DEBUG) System.err.println("next buffer received");
       
   192                     buffers.put(t);
       
   193                     if (DEBUG) System.err.println("buffered offered");
       
   194                 } catch (Exception ex) {
       
   195                     failed = ex;
       
   196                     try {
       
   197                         close();
       
   198                     } catch (IOException ex1) {
       
   199                         // OK
       
   200                     }
       
   201                 }
       
   202             }
       
   203 
       
   204             @Override
       
   205             public void onError(Throwable thrwbl) {
       
   206                 failed = thrwbl;
       
   207             }
       
   208 
       
   209             @Override
       
   210             public synchronized void onComplete() {
       
   211                 subscription = null;
       
   212                 onNext(LAST);
       
   213             }
       
   214 
       
   215             @Override
       
   216             public void close() throws IOException {
       
   217                 synchronized (this) {
       
   218                     closed = true;
       
   219                     Flow.Subscription s = subscription;
       
   220                     if (s != null) {
       
   221                         s.cancel();
       
   222                     }
       
   223                     subscription = null;
       
   224                 }
       
   225                 super.close();
       
   226             }
       
   227 
       
   228         }
       
   229     }
       
   230 
       
   231     /**
       
   232      * Examine the response headers to figure out the charset used to
       
   233      * encode the body content.
       
   234      * If the content type is not textual, returns an empty Optional.
       
   235      * Otherwise, returns the body content's charset, defaulting to
       
   236      * ISO-8859-1 if none is explicitly specified.
       
   237      * @param headers The response headers.
       
   238      * @return The charset to use for decoding the response body, if
       
   239      *         the response body content is text/...
       
   240      */
       
   241     public static Optional<Charset> getCharset(HttpHeaders headers) {
       
   242         Optional<String> contentType = headers.firstValue("Content-Type");
       
   243         Optional<Charset> charset = Optional.empty();
       
   244         if (contentType.isPresent()) {
       
   245             final String[] values = contentType.get().split(";");
       
   246             if (values[0].startsWith("text/")) {
       
   247                 charset = Optional.of(Stream.of(values)
       
   248                     .map(x -> x.toLowerCase(Locale.ROOT))
       
   249                     .map(String::trim)
       
   250                     .filter(x -> x.startsWith("charset="))
       
   251                     .map(x -> x.substring("charset=".length()))
       
   252                     .findFirst()
       
   253                     .orElse("ISO-8859-1"))
       
   254                     .map(Charset::forName);
       
   255             }
       
   256         }
       
   257         return charset;
       
   258     }
       
   259 
       
   260     public static void main(String[] args) throws Exception {
       
   261         HttpClient client = HttpClient.newHttpClient();
       
   262         HttpRequest request = HttpRequest
       
   263             .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/"))
       
   264             .GET()
       
   265             .build();
       
   266 
       
   267         // This example shows how to return an InputStream that can be used to
       
   268         // start reading the response body before the response is fully received.
       
   269         // In comparison, the snipet below (which uses
       
   270         // HttpResponse.BodyHandler.asString()) obviously will not return before the
       
   271         // response body is fully read:
       
   272         //
       
   273         // System.out.println(
       
   274         //    client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body());
       
   275 
       
   276         CompletableFuture<HttpResponse<InputStream>> handle =
       
   277             client.sendAsync(request, new HttpInputStreamHandler());
       
   278         if (DEBUG) System.err.println("Request sent");
       
   279 
       
   280         HttpResponse<InputStream> pending = handle.get();
       
   281 
       
   282         // At this point, the response headers have been received, but the
       
   283         // response body may not have arrived yet. This comes from
       
   284         // the implementation of HttpResponseInputStream::getBody above,
       
   285         // which returns an already completed completion stage, without
       
   286         // waiting for any data.
       
   287         // We can therefore access the headers - and the body, which
       
   288         // is our live InputStream, without waiting...
       
   289         HttpHeaders responseHeaders = pending.headers();
       
   290 
       
   291         // Get the charset declared in the response headers.
       
   292         // The optional will be empty if the content type is not
       
   293         // of type text/...
       
   294         Optional<Charset> charset = getCharset(responseHeaders);
       
   295 
       
   296         try (InputStream is = pending.body();
       
   297             // We assume a textual content type. Construct an InputStream
       
   298             // Reader with the appropriate Charset.
       
   299             // charset.get() will throw NPE if the content is not textual.
       
   300             Reader r = new InputStreamReader(is, charset.get())) {
       
   301 
       
   302             char[] buff = new char[32];
       
   303             int off=0, n=0;
       
   304             if (DEBUG) System.err.println("Start receiving response body");
       
   305             if (DEBUG) System.err.println("Charset: " + charset.get());
       
   306 
       
   307             // Start consuming the InputStream as the data arrives.
       
   308             // Will block until there is something to read...
       
   309             while ((n = r.read(buff, off, buff.length - off)) > 0) {
       
   310                 assert (buff.length - off) > 0;
       
   311                 assert n <= (buff.length - off);
       
   312                 if (n == (buff.length - off)) {
       
   313                     System.out.print(buff);
       
   314                     off = 0;
       
   315                 } else {
       
   316                     off += n;
       
   317                 }
       
   318                 assert off < buff.length;
       
   319             }
       
   320 
       
   321             // last call to read may not have filled 'buff' completely.
       
   322             // flush out the remaining characters.
       
   323             assert off >= 0 && off < buff.length;
       
   324             for (int i=0; i < off; i++) {
       
   325                 System.out.print(buff[i]);
       
   326             }
       
   327 
       
   328             // We're done!
       
   329             System.out.println("Done!");
       
   330         }
       
   331     }
       
   332 
       
   333 }