|
1 /* |
|
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. |
|
8 * |
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 * version 2 for more details (a copy is included in the LICENSE file that |
|
13 * accompanied this code). |
|
14 * |
|
15 * You should have received a copy of the GNU General Public License version |
|
16 * 2 along with this work; if not, write to the Free Software Foundation, |
|
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 * |
|
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
20 * or visit www.oracle.com if you need additional information or have any |
|
21 * questions. |
|
22 */ |
|
23 |
|
24 import java.io.IOException; |
|
25 import java.io.InputStream; |
|
26 import java.io.InputStreamReader; |
|
27 import java.io.Reader; |
|
28 import java.net.URI; |
|
29 import jdk.incubator.http.HttpClient; |
|
30 import jdk.incubator.http.HttpHeaders; |
|
31 import jdk.incubator.http.HttpRequest; |
|
32 import jdk.incubator.http.HttpResponse; |
|
33 import java.nio.ByteBuffer; |
|
34 import java.nio.charset.Charset; |
|
35 import java.util.Locale; |
|
36 import java.util.Optional; |
|
37 import java.util.concurrent.ArrayBlockingQueue; |
|
38 import java.util.concurrent.BlockingQueue; |
|
39 import java.util.concurrent.CompletableFuture; |
|
40 import java.util.concurrent.CompletionStage; |
|
41 import java.util.concurrent.Flow; |
|
42 import java.util.stream.Stream; |
|
43 |
|
44 /* |
|
45 * @test |
|
46 * @summary An example on how to read a response body with InputStream... |
|
47 * @run main/othervm HttpInputStreamTest |
|
48 * @author daniel fuchs |
|
49 */ |
|
50 public class HttpInputStreamTest { |
|
51 |
|
52 public static boolean DEBUG = Boolean.getBoolean("test.debug"); |
|
53 |
|
54 /** |
|
55 * A simple HttpResponse.BodyHandler that creates a live |
|
56 * InputStream to read the response body from the underlying ByteBuffer |
|
57 * Flow. |
|
58 * The InputStream is made immediately available for consumption, before |
|
59 * the response body is fully received. |
|
60 */ |
|
61 public static class HttpInputStreamHandler |
|
62 implements HttpResponse.BodyHandler<InputStream> { |
|
63 |
|
64 public static final int MAX_BUFFERS_IN_QUEUE = 1; |
|
65 |
|
66 private final int maxBuffers; |
|
67 |
|
68 public HttpInputStreamHandler() { |
|
69 this(MAX_BUFFERS_IN_QUEUE); |
|
70 } |
|
71 |
|
72 public HttpInputStreamHandler(int maxBuffers) { |
|
73 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
|
74 } |
|
75 |
|
76 @Override |
|
77 public synchronized HttpResponse.BodyProcessor<InputStream> |
|
78 apply(int i, HttpHeaders hh) { |
|
79 return new HttpResponseInputStream(maxBuffers); |
|
80 } |
|
81 |
|
82 /** |
|
83 * An InputStream built on top of the Flow API. |
|
84 */ |
|
85 private static class HttpResponseInputStream extends InputStream |
|
86 implements HttpResponse.BodyProcessor<InputStream> { |
|
87 |
|
88 // An immutable ByteBuffer sentinel to mark that the last byte was received. |
|
89 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); |
|
90 |
|
91 // A queue of yet unprocessed ByteBuffers received from the flow API. |
|
92 private final BlockingQueue<ByteBuffer> buffers; |
|
93 private volatile Flow.Subscription subscription; |
|
94 private volatile boolean closed; |
|
95 private volatile Throwable failed; |
|
96 private volatile ByteBuffer current; |
|
97 |
|
98 HttpResponseInputStream() { |
|
99 this(MAX_BUFFERS_IN_QUEUE); |
|
100 } |
|
101 |
|
102 HttpResponseInputStream(int maxBuffers) { |
|
103 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
|
104 this.buffers = new ArrayBlockingQueue<>(capacity); |
|
105 } |
|
106 |
|
107 @Override |
|
108 public CompletionStage<InputStream> getBody() { |
|
109 // Return the stream immediately, before the |
|
110 // response body is received. |
|
111 // This makes it possible for senAsync().get().body() |
|
112 // to complete before the response body is received. |
|
113 return CompletableFuture.completedStage(this); |
|
114 } |
|
115 |
|
116 // Returns the current byte buffer to read from. |
|
117 // If the current buffer has no remaining data, will take the |
|
118 // next buffer from the buffers queue, possibly blocking until |
|
119 // a new buffer is made available through the Flow API, or the |
|
120 // end of the flow is reached. |
|
121 private ByteBuffer current() throws IOException { |
|
122 while (current == null || !current.hasRemaining()) { |
|
123 // Check whether the stream is claused or exhausted |
|
124 if (closed || failed != null) { |
|
125 throw new IOException("closed", failed); |
|
126 } |
|
127 if (current == LAST) break; |
|
128 |
|
129 try { |
|
130 // Take a new buffer from the queue, blocking |
|
131 // if none is available yet... |
|
132 if (DEBUG) System.err.println("Taking Buffer"); |
|
133 current = buffers.take(); |
|
134 if (DEBUG) System.err.println("Buffer Taken"); |
|
135 |
|
136 // Check whether some exception was encountered |
|
137 // upstream |
|
138 if (closed || failed != null) { |
|
139 throw new IOException("closed", failed); |
|
140 } |
|
141 |
|
142 // Check whether we're done. |
|
143 if (current == LAST) break; |
|
144 |
|
145 // Inform the producer that it can start sending |
|
146 // us a new buffer |
|
147 Flow.Subscription s = subscription; |
|
148 if (s != null) s.request(1); |
|
149 |
|
150 } catch (InterruptedException ex) { |
|
151 // continue |
|
152 } |
|
153 } |
|
154 assert current == LAST || current.hasRemaining(); |
|
155 return current; |
|
156 } |
|
157 |
|
158 @Override |
|
159 public int read(byte[] bytes, int off, int len) throws IOException { |
|
160 // get the buffer to read from, possibly blocking if |
|
161 // none is available |
|
162 ByteBuffer buffer; |
|
163 if ((buffer = current()) == LAST) return -1; |
|
164 |
|
165 // don't attempt to read more than what is available |
|
166 // in the current buffer. |
|
167 int read = Math.min(buffer.remaining(), len); |
|
168 assert read > 0 && read <= buffer.remaining(); |
|
169 |
|
170 // buffer.get() will do the boundary check for us. |
|
171 buffer.get(bytes, off, read); |
|
172 return read; |
|
173 } |
|
174 |
|
175 @Override |
|
176 public int read() throws IOException { |
|
177 ByteBuffer buffer; |
|
178 if ((buffer = current()) == LAST) return -1; |
|
179 return buffer.get() & 0xFF; |
|
180 } |
|
181 |
|
182 @Override |
|
183 public void onSubscribe(Flow.Subscription s) { |
|
184 this.subscription = s; |
|
185 s.request(Math.max(2, buffers.remainingCapacity() + 1)); |
|
186 } |
|
187 |
|
188 @Override |
|
189 public synchronized void onNext(ByteBuffer t) { |
|
190 try { |
|
191 if (DEBUG) System.err.println("next buffer received"); |
|
192 buffers.put(t); |
|
193 if (DEBUG) System.err.println("buffered offered"); |
|
194 } catch (Exception ex) { |
|
195 failed = ex; |
|
196 try { |
|
197 close(); |
|
198 } catch (IOException ex1) { |
|
199 // OK |
|
200 } |
|
201 } |
|
202 } |
|
203 |
|
204 @Override |
|
205 public void onError(Throwable thrwbl) { |
|
206 failed = thrwbl; |
|
207 } |
|
208 |
|
209 @Override |
|
210 public synchronized void onComplete() { |
|
211 subscription = null; |
|
212 onNext(LAST); |
|
213 } |
|
214 |
|
215 @Override |
|
216 public void close() throws IOException { |
|
217 synchronized (this) { |
|
218 closed = true; |
|
219 Flow.Subscription s = subscription; |
|
220 if (s != null) { |
|
221 s.cancel(); |
|
222 } |
|
223 subscription = null; |
|
224 } |
|
225 super.close(); |
|
226 } |
|
227 |
|
228 } |
|
229 } |
|
230 |
|
231 /** |
|
232 * Examine the response headers to figure out the charset used to |
|
233 * encode the body content. |
|
234 * If the content type is not textual, returns an empty Optional. |
|
235 * Otherwise, returns the body content's charset, defaulting to |
|
236 * ISO-8859-1 if none is explicitly specified. |
|
237 * @param headers The response headers. |
|
238 * @return The charset to use for decoding the response body, if |
|
239 * the response body content is text/... |
|
240 */ |
|
241 public static Optional<Charset> getCharset(HttpHeaders headers) { |
|
242 Optional<String> contentType = headers.firstValue("Content-Type"); |
|
243 Optional<Charset> charset = Optional.empty(); |
|
244 if (contentType.isPresent()) { |
|
245 final String[] values = contentType.get().split(";"); |
|
246 if (values[0].startsWith("text/")) { |
|
247 charset = Optional.of(Stream.of(values) |
|
248 .map(x -> x.toLowerCase(Locale.ROOT)) |
|
249 .map(String::trim) |
|
250 .filter(x -> x.startsWith("charset=")) |
|
251 .map(x -> x.substring("charset=".length())) |
|
252 .findFirst() |
|
253 .orElse("ISO-8859-1")) |
|
254 .map(Charset::forName); |
|
255 } |
|
256 } |
|
257 return charset; |
|
258 } |
|
259 |
|
260 public static void main(String[] args) throws Exception { |
|
261 HttpClient client = HttpClient.newHttpClient(); |
|
262 HttpRequest request = HttpRequest |
|
263 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) |
|
264 .GET() |
|
265 .build(); |
|
266 |
|
267 // This example shows how to return an InputStream that can be used to |
|
268 // start reading the response body before the response is fully received. |
|
269 // In comparison, the snipet below (which uses |
|
270 // HttpResponse.BodyHandler.asString()) obviously will not return before the |
|
271 // response body is fully read: |
|
272 // |
|
273 // System.out.println( |
|
274 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); |
|
275 |
|
276 CompletableFuture<HttpResponse<InputStream>> handle = |
|
277 client.sendAsync(request, new HttpInputStreamHandler()); |
|
278 if (DEBUG) System.err.println("Request sent"); |
|
279 |
|
280 HttpResponse<InputStream> pending = handle.get(); |
|
281 |
|
282 // At this point, the response headers have been received, but the |
|
283 // response body may not have arrived yet. This comes from |
|
284 // the implementation of HttpResponseInputStream::getBody above, |
|
285 // which returns an already completed completion stage, without |
|
286 // waiting for any data. |
|
287 // We can therefore access the headers - and the body, which |
|
288 // is our live InputStream, without waiting... |
|
289 HttpHeaders responseHeaders = pending.headers(); |
|
290 |
|
291 // Get the charset declared in the response headers. |
|
292 // The optional will be empty if the content type is not |
|
293 // of type text/... |
|
294 Optional<Charset> charset = getCharset(responseHeaders); |
|
295 |
|
296 try (InputStream is = pending.body(); |
|
297 // We assume a textual content type. Construct an InputStream |
|
298 // Reader with the appropriate Charset. |
|
299 // charset.get() will throw NPE if the content is not textual. |
|
300 Reader r = new InputStreamReader(is, charset.get())) { |
|
301 |
|
302 char[] buff = new char[32]; |
|
303 int off=0, n=0; |
|
304 if (DEBUG) System.err.println("Start receiving response body"); |
|
305 if (DEBUG) System.err.println("Charset: " + charset.get()); |
|
306 |
|
307 // Start consuming the InputStream as the data arrives. |
|
308 // Will block until there is something to read... |
|
309 while ((n = r.read(buff, off, buff.length - off)) > 0) { |
|
310 assert (buff.length - off) > 0; |
|
311 assert n <= (buff.length - off); |
|
312 if (n == (buff.length - off)) { |
|
313 System.out.print(buff); |
|
314 off = 0; |
|
315 } else { |
|
316 off += n; |
|
317 } |
|
318 assert off < buff.length; |
|
319 } |
|
320 |
|
321 // last call to read may not have filled 'buff' completely. |
|
322 // flush out the remaining characters. |
|
323 assert off >= 0 && off < buff.length; |
|
324 for (int i=0; i < off; i++) { |
|
325 System.out.print(buff[i]); |
|
326 } |
|
327 |
|
328 // We're done! |
|
329 System.out.println("Done!"); |
|
330 } |
|
331 } |
|
332 |
|
333 } |