30 import jdk.incubator.http.HttpHeaders; |
30 import jdk.incubator.http.HttpHeaders; |
31 import jdk.incubator.http.HttpRequest; |
31 import jdk.incubator.http.HttpRequest; |
32 import jdk.incubator.http.HttpResponse; |
32 import jdk.incubator.http.HttpResponse; |
33 import java.nio.ByteBuffer; |
33 import java.nio.ByteBuffer; |
34 import java.nio.charset.Charset; |
34 import java.nio.charset.Charset; |
|
35 import java.util.Iterator; |
|
36 import java.util.List; |
35 import java.util.Locale; |
37 import java.util.Locale; |
36 import java.util.Optional; |
38 import java.util.Optional; |
37 import java.util.concurrent.ArrayBlockingQueue; |
39 import java.util.concurrent.ArrayBlockingQueue; |
38 import java.util.concurrent.BlockingQueue; |
40 import java.util.concurrent.BlockingQueue; |
39 import java.util.concurrent.CompletableFuture; |
41 import java.util.concurrent.CompletableFuture; |
40 import java.util.concurrent.CompletionStage; |
42 import java.util.concurrent.CompletionStage; |
41 import java.util.concurrent.Flow; |
43 import java.util.concurrent.Flow; |
42 import java.util.stream.Stream; |
44 import java.util.stream.Stream; |
|
45 import static java.lang.System.err; |
43 |
46 |
44 /* |
47 /* |
45 * @test |
48 * @test |
46 * @summary An example on how to read a response body with InputStream... |
49 * @summary An example on how to read a response body with InputStream... |
47 * @run main/othervm HttpInputStreamTest |
50 * @run main/othervm -Dtest.debug=true HttpInputStreamTest |
48 * @author daniel fuchs |
51 * @author daniel fuchs |
49 */ |
52 */ |
50 public class HttpInputStreamTest { |
53 public class HttpInputStreamTest { |
51 |
54 |
52 public static boolean DEBUG = Boolean.getBoolean("test.debug"); |
55 public static boolean DEBUG = Boolean.getBoolean("test.debug"); |
72 public HttpInputStreamHandler(int maxBuffers) { |
75 public HttpInputStreamHandler(int maxBuffers) { |
73 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
76 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
74 } |
77 } |
75 |
78 |
76 @Override |
79 @Override |
77 public synchronized HttpResponse.BodyProcessor<InputStream> |
80 public HttpResponse.BodySubscriber<InputStream> |
78 apply(int i, HttpHeaders hh) { |
81 apply(int i, HttpHeaders hh) { |
79 return new HttpResponseInputStream(maxBuffers); |
82 return new HttpResponseInputStream(maxBuffers); |
80 } |
83 } |
81 |
84 |
82 /** |
85 /** |
83 * An InputStream built on top of the Flow API. |
86 * An InputStream built on top of the Flow API. |
84 */ |
87 */ |
85 private static class HttpResponseInputStream extends InputStream |
88 private static class HttpResponseInputStream extends InputStream |
86 implements HttpResponse.BodyProcessor<InputStream> { |
89 implements HttpResponse.BodySubscriber<InputStream> { |
87 |
90 |
88 // An immutable ByteBuffer sentinel to mark that the last byte was received. |
91 // An immutable ByteBuffer sentinel to mark that the last byte was received. |
89 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); |
92 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); |
|
93 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); |
90 |
94 |
91 // A queue of yet unprocessed ByteBuffers received from the flow API. |
95 // A queue of yet unprocessed ByteBuffers received from the flow API. |
92 private final BlockingQueue<ByteBuffer> buffers; |
96 private final BlockingQueue<List<ByteBuffer>> buffers; |
93 private volatile Flow.Subscription subscription; |
97 private volatile Flow.Subscription subscription; |
94 private volatile boolean closed; |
98 private volatile boolean closed; |
95 private volatile Throwable failed; |
99 private volatile Throwable failed; |
96 private volatile ByteBuffer current; |
100 private volatile Iterator<ByteBuffer> currentListItr; |
|
101 private volatile ByteBuffer currentBuffer; |
97 |
102 |
98 HttpResponseInputStream() { |
103 HttpResponseInputStream() { |
99 this(MAX_BUFFERS_IN_QUEUE); |
104 this(MAX_BUFFERS_IN_QUEUE); |
100 } |
105 } |
101 |
106 |
102 HttpResponseInputStream(int maxBuffers) { |
107 HttpResponseInputStream(int maxBuffers) { |
103 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
108 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; |
104 this.buffers = new ArrayBlockingQueue<>(capacity); |
109 // 1 additional slot for LAST_LIST added by onComplete |
|
110 this.buffers = new ArrayBlockingQueue<>(capacity + 1); |
105 } |
111 } |
106 |
112 |
107 @Override |
113 @Override |
108 public CompletionStage<InputStream> getBody() { |
114 public CompletionStage<InputStream> getBody() { |
109 // Return the stream immediately, before the |
115 // Return the stream immediately, before the |
117 // If the current buffer has no remaining data, will take the |
123 // If the current buffer has no remaining data, will take the |
118 // next buffer from the buffers queue, possibly blocking until |
124 // next buffer from the buffers queue, possibly blocking until |
119 // a new buffer is made available through the Flow API, or the |
125 // a new buffer is made available through the Flow API, or the |
120 // end of the flow is reached. |
126 // end of the flow is reached. |
121 private ByteBuffer current() throws IOException { |
127 private ByteBuffer current() throws IOException { |
122 while (current == null || !current.hasRemaining()) { |
128 while (currentBuffer == null || !currentBuffer.hasRemaining()) { |
123 // Check whether the stream is claused or exhausted |
129 // Check whether the stream is closed or exhausted |
124 if (closed || failed != null) { |
130 if (closed || failed != null) { |
125 throw new IOException("closed", failed); |
131 throw new IOException("closed", failed); |
126 } |
132 } |
127 if (current == LAST) break; |
133 if (currentBuffer == LAST_BUFFER) break; |
128 |
134 |
129 try { |
135 try { |
130 // Take a new buffer from the queue, blocking |
136 if (currentListItr == null || !currentListItr.hasNext()) { |
131 // if none is available yet... |
137 // Take a new list of buffers from the queue, blocking |
132 if (DEBUG) System.err.println("Taking Buffer"); |
138 // if none is available yet... |
133 current = buffers.take(); |
139 |
134 if (DEBUG) System.err.println("Buffer Taken"); |
140 if (DEBUG) err.println("Taking list of Buffers"); |
135 |
141 List<ByteBuffer> lb = buffers.take(); |
136 // Check whether some exception was encountered |
142 currentListItr = lb.iterator(); |
137 // upstream |
143 if (DEBUG) err.println("List of Buffers Taken"); |
138 if (closed || failed != null) { |
144 |
139 throw new IOException("closed", failed); |
145 // Check whether an exception was encountered upstream |
|
146 if (closed || failed != null) |
|
147 throw new IOException("closed", failed); |
|
148 |
|
149 // Check whether we're done. |
|
150 if (lb == LAST_LIST) { |
|
151 currentListItr = null; |
|
152 currentBuffer = LAST_BUFFER; |
|
153 break; |
|
154 } |
|
155 |
|
156 // Request another upstream item ( list of buffers ) |
|
157 Flow.Subscription s = subscription; |
|
158 if (s != null) |
|
159 s.request(1); |
140 } |
160 } |
141 |
161 assert currentListItr != null; |
142 // Check whether we're done. |
162 assert currentListItr.hasNext(); |
143 if (current == LAST) break; |
163 if (DEBUG) err.println("Next Buffer"); |
144 |
164 currentBuffer = currentListItr.next(); |
145 // Inform the producer that it can start sending |
|
146 // us a new buffer |
|
147 Flow.Subscription s = subscription; |
|
148 if (s != null) s.request(1); |
|
149 |
|
150 } catch (InterruptedException ex) { |
165 } catch (InterruptedException ex) { |
151 // continue |
166 // continue |
152 } |
167 } |
153 } |
168 } |
154 assert current == LAST || current.hasRemaining(); |
169 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); |
155 return current; |
170 return currentBuffer; |
156 } |
171 } |
157 |
172 |
158 @Override |
173 @Override |
159 public int read(byte[] bytes, int off, int len) throws IOException { |
174 public int read(byte[] bytes, int off, int len) throws IOException { |
160 // get the buffer to read from, possibly blocking if |
175 // get the buffer to read from, possibly blocking if |
161 // none is available |
176 // none is available |
162 ByteBuffer buffer; |
177 ByteBuffer buffer; |
163 if ((buffer = current()) == LAST) return -1; |
178 if ((buffer = current()) == LAST_BUFFER) return -1; |
164 |
179 |
165 // don't attempt to read more than what is available |
180 // don't attempt to read more than what is available |
166 // in the current buffer. |
181 // in the current buffer. |
167 int read = Math.min(buffer.remaining(), len); |
182 int read = Math.min(buffer.remaining(), len); |
168 assert read > 0 && read <= buffer.remaining(); |
183 assert read > 0 && read <= buffer.remaining(); |
173 } |
188 } |
174 |
189 |
175 @Override |
190 @Override |
176 public int read() throws IOException { |
191 public int read() throws IOException { |
177 ByteBuffer buffer; |
192 ByteBuffer buffer; |
178 if ((buffer = current()) == LAST) return -1; |
193 if ((buffer = current()) == LAST_BUFFER) return -1; |
179 return buffer.get() & 0xFF; |
194 return buffer.get() & 0xFF; |
180 } |
195 } |
181 |
196 |
182 @Override |
197 @Override |
183 public void onSubscribe(Flow.Subscription s) { |
198 public void onSubscribe(Flow.Subscription s) { |
|
199 if (this.subscription != null) { |
|
200 s.cancel(); |
|
201 return; |
|
202 } |
184 this.subscription = s; |
203 this.subscription = s; |
185 s.request(Math.max(2, buffers.remainingCapacity() + 1)); |
204 assert buffers.remainingCapacity() > 1; // should at least be 2 |
186 } |
205 if (DEBUG) err.println("onSubscribe: requesting " |
187 |
206 + Math.max(1, buffers.remainingCapacity() - 1)); |
188 @Override |
207 s.request(Math.max(1, buffers.remainingCapacity() - 1)); |
189 public synchronized void onNext(ByteBuffer t) { |
208 } |
|
209 |
|
210 @Override |
|
211 public void onNext(List<ByteBuffer> t) { |
190 try { |
212 try { |
191 if (DEBUG) System.err.println("next buffer received"); |
213 if (DEBUG) err.println("next item received"); |
192 buffers.put(t); |
214 if (!buffers.offer(t)) { |
193 if (DEBUG) System.err.println("buffered offered"); |
215 throw new IllegalStateException("queue is full"); |
|
216 } |
|
217 if (DEBUG) err.println("item offered"); |
194 } catch (Exception ex) { |
218 } catch (Exception ex) { |
195 failed = ex; |
219 failed = ex; |
196 try { |
220 try { |
197 close(); |
221 close(); |
198 } catch (IOException ex1) { |
222 } catch (IOException ex1) { |
272 // |
298 // |
273 // System.out.println( |
299 // System.out.println( |
274 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); |
300 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); |
275 |
301 |
276 CompletableFuture<HttpResponse<InputStream>> handle = |
302 CompletableFuture<HttpResponse<InputStream>> handle = |
277 client.sendAsync(request, new HttpInputStreamHandler()); |
303 client.sendAsync(request, new HttpInputStreamHandler(3)); |
278 if (DEBUG) System.err.println("Request sent"); |
304 if (DEBUG) err.println("Request sent"); |
279 |
305 |
280 HttpResponse<InputStream> pending = handle.get(); |
306 HttpResponse<InputStream> pending = handle.get(); |
281 |
307 |
282 // At this point, the response headers have been received, but the |
308 // At this point, the response headers have been received, but the |
283 // response body may not have arrived yet. This comes from |
309 // response body may not have arrived yet. This comes from |
299 // charset.get() will throw NPE if the content is not textual. |
325 // charset.get() will throw NPE if the content is not textual. |
300 Reader r = new InputStreamReader(is, charset.get())) { |
326 Reader r = new InputStreamReader(is, charset.get())) { |
301 |
327 |
302 char[] buff = new char[32]; |
328 char[] buff = new char[32]; |
303 int off=0, n=0; |
329 int off=0, n=0; |
304 if (DEBUG) System.err.println("Start receiving response body"); |
330 if (DEBUG) err.println("Start receiving response body"); |
305 if (DEBUG) System.err.println("Charset: " + charset.get()); |
331 if (DEBUG) err.println("Charset: " + charset.get()); |
306 |
332 |
307 // Start consuming the InputStream as the data arrives. |
333 // Start consuming the InputStream as the data arrives. |
308 // Will block until there is something to read... |
334 // Will block until there is something to read... |
309 while ((n = r.read(buff, off, buff.length - off)) > 0) { |
335 while ((n = r.read(buff, off, buff.length - off)) > 0) { |
310 assert (buff.length - off) > 0; |
336 assert (buff.length - off) > 0; |