|
1 /* |
|
2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package java.net.http.internal; |
|
27 |
|
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
|
30 import java.net.URI; |
|
31 import java.nio.ByteBuffer; |
|
32 import java.util.ArrayList; |
|
33 import java.util.Collections; |
|
34 import java.util.List; |
|
35 import java.util.concurrent.CompletableFuture; |
|
36 import java.util.concurrent.ConcurrentLinkedDeque; |
|
37 import java.util.concurrent.ConcurrentLinkedQueue; |
|
38 import java.util.concurrent.Executor; |
|
39 import java.util.concurrent.Flow; |
|
40 import java.util.concurrent.Flow.Subscription; |
|
41 import java.util.concurrent.atomic.AtomicReference; |
|
42 import java.util.function.BiPredicate; |
|
43 import java.net.http.HttpClient; |
|
44 import java.net.http.HttpHeaders; |
|
45 import java.net.http.HttpRequest; |
|
46 import java.net.http.HttpResponse; |
|
47 import java.net.http.HttpResponse.BodySubscriber; |
|
48 import java.net.http.internal.common.*; |
|
49 import java.net.http.internal.frame.*; |
|
50 import java.net.http.internal.hpack.DecodingCallback; |
|
51 |
|
52 /** |
|
53 * Http/2 Stream handling. |
|
54 * |
|
55 * REQUESTS |
|
56 * |
|
57 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q |
|
58 * |
|
59 * sendRequest() -- sendHeadersOnly() + sendBody() |
|
60 * |
|
61 * sendBodyAsync() -- calls sendBody() in an executor thread. |
|
62 * |
|
63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block |
|
64 * |
|
65 * sendRequestAsync() -- calls sendRequest() in an executor thread |
|
66 * |
|
67 * RESPONSES |
|
68 * |
|
69 * Multiple responses can be received per request. Responses are queued up on |
|
70 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed |
|
71 * with the next response |
|
72 * |
|
73 * getResponseAsync() -- queries list of response CFs and returns first one |
|
74 * if one exists. Otherwise, creates one and adds it to list |
|
75 * and returns it. Completion is achieved through the |
|
76 * incoming() upcall from connection reader thread. |
|
77 * |
|
78 * getResponse() -- calls getResponseAsync() and waits for CF to complete |
|
79 * |
|
80 * responseBodyAsync() -- calls responseBody() in an executor thread. |
|
81 * |
|
82 * incoming() -- entry point called from connection reader thread. Frames are |
|
83 * either handled immediately without blocking or for data frames |
|
84 * placed on the stream's inputQ which is consumed by the stream's |
|
85 * reader thread. |
|
86 * |
|
87 * PushedStream sub class |
|
88 * ====================== |
|
89 * Sending side methods are not used because the request comes from a PUSH_PROMISE |
|
90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream |
|
91 * is created. PushedStream does not use responseCF list as there can be only |
|
92 * one response. The CF is created when the object created and when the response |
|
93 * HEADERS frame is received the object is completed. |
|
94 */ |
|
95 class Stream<T> extends ExchangeImpl<T> { |
|
96 |
|
97 final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag |
|
98 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
|
99 |
|
100 final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>(); |
|
101 final SequentialScheduler sched = |
|
102 SequentialScheduler.synchronizedScheduler(this::schedule); |
|
103 final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel); |
|
104 |
|
105 /** |
|
106 * This stream's identifier. Assigned lazily by the HTTP2Connection before |
|
107 * the stream's first frame is sent. |
|
108 */ |
|
109 protected volatile int streamid; |
|
110 |
|
111 long requestContentLen; |
|
112 |
|
113 final Http2Connection connection; |
|
114 final HttpRequestImpl request; |
|
115 final DecodingCallback rspHeadersConsumer; |
|
116 HttpHeadersImpl responseHeaders; |
|
117 final HttpHeadersImpl requestPseudoHeaders; |
|
118 volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
|
119 final HttpRequest.BodyPublisher requestPublisher; |
|
120 volatile RequestSubscriber requestSubscriber; |
|
121 volatile int responseCode; |
|
122 volatile Response response; |
|
123 volatile Throwable failed; // The exception with which this stream was canceled. |
|
124 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); |
|
125 volatile CompletableFuture<T> responseBodyCF; |
|
126 |
|
127 /** True if END_STREAM has been seen in a frame received on this stream. */ |
|
128 private volatile boolean remotelyClosed; |
|
129 private volatile boolean closed; |
|
130 private volatile boolean endStreamSent; |
|
131 |
|
132 // state flags |
|
133 private boolean requestSent, responseReceived; |
|
134 |
|
135 /** |
|
136 * A reference to this Stream's connection Send Window controller. The |
|
137 * stream MUST acquire the appropriate amount of Send Window before |
|
138 * sending any data. Will be null for PushStreams, as they cannot send data. |
|
139 */ |
|
140 private final WindowController windowController; |
|
141 private final WindowUpdateSender windowUpdater; |
|
142 |
|
143 @Override |
|
144 HttpConnection connection() { |
|
145 return connection.connection; |
|
146 } |
|
147 |
|
148 /** |
|
149 * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } |
|
150 * of after user subscription window has re-opened, from SubscriptionBase.request() |
|
151 */ |
|
152 private void schedule() { |
|
153 if (responseSubscriber == null) |
|
154 // can't process anything yet |
|
155 return; |
|
156 |
|
157 try { |
|
158 while (!inputQ.isEmpty()) { |
|
159 Http2Frame frame = inputQ.peek(); |
|
160 if (frame instanceof ResetFrame) { |
|
161 inputQ.remove(); |
|
162 handleReset((ResetFrame)frame); |
|
163 return; |
|
164 } |
|
165 DataFrame df = (DataFrame)frame; |
|
166 boolean finished = df.getFlag(DataFrame.END_STREAM); |
|
167 |
|
168 List<ByteBuffer> buffers = df.getData(); |
|
169 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); |
|
170 int size = Utils.remaining(dsts, Integer.MAX_VALUE); |
|
171 if (size == 0 && finished) { |
|
172 inputQ.remove(); |
|
173 Log.logTrace("responseSubscriber.onComplete"); |
|
174 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
175 sched.stop(); |
|
176 responseSubscriber.onComplete(); |
|
177 setEndStreamReceived(); |
|
178 return; |
|
179 } else if (userSubscription.tryDecrement()) { |
|
180 inputQ.remove(); |
|
181 Log.logTrace("responseSubscriber.onNext {0}", size); |
|
182 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); |
|
183 responseSubscriber.onNext(dsts); |
|
184 if (consumed(df)) { |
|
185 Log.logTrace("responseSubscriber.onComplete"); |
|
186 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
187 sched.stop(); |
|
188 responseSubscriber.onComplete(); |
|
189 setEndStreamReceived(); |
|
190 return; |
|
191 } |
|
192 } else { |
|
193 return; |
|
194 } |
|
195 } |
|
196 } catch (Throwable throwable) { |
|
197 failed = throwable; |
|
198 } |
|
199 |
|
200 Throwable t = failed; |
|
201 if (t != null) { |
|
202 sched.stop(); |
|
203 responseSubscriber.onError(t); |
|
204 close(); |
|
205 } |
|
206 } |
|
207 |
|
208 // Callback invoked after the Response BodySubscriber has consumed the |
|
209 // buffers contained in a DataFrame. |
|
210 // Returns true if END_STREAM is reached, false otherwise. |
|
211 private boolean consumed(DataFrame df) { |
|
212 // RFC 7540 6.1: |
|
213 // The entire DATA frame payload is included in flow control, |
|
214 // including the Pad Length and Padding fields if present |
|
215 int len = df.payloadLength(); |
|
216 connection.windowUpdater.update(len); |
|
217 |
|
218 if (!df.getFlag(DataFrame.END_STREAM)) { |
|
219 // Don't send window update on a stream which is |
|
220 // closed or half closed. |
|
221 windowUpdater.update(len); |
|
222 return false; // more data coming |
|
223 } |
|
224 return true; // end of stream |
|
225 } |
|
226 |
|
227 @Override |
|
228 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
|
229 boolean returnConnectionToPool, |
|
230 Executor executor) |
|
231 { |
|
232 Log.logTrace("Reading body on stream {0}", streamid); |
|
233 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); |
|
234 CompletableFuture<T> cf = receiveData(bodySubscriber, executor); |
|
235 |
|
236 PushGroup<?> pg = exchange.getPushGroup(); |
|
237 if (pg != null) { |
|
238 // if an error occurs make sure it is recorded in the PushGroup |
|
239 cf = cf.whenComplete((t,e) -> pg.pushError(e)); |
|
240 } |
|
241 return cf; |
|
242 } |
|
243 |
|
244 @Override |
|
245 public String toString() { |
|
246 StringBuilder sb = new StringBuilder(); |
|
247 sb.append("streamid: ") |
|
248 .append(streamid); |
|
249 return sb.toString(); |
|
250 } |
|
251 |
|
252 private void receiveDataFrame(DataFrame df) { |
|
253 inputQ.add(df); |
|
254 sched.runOrSchedule(); |
|
255 } |
|
256 |
|
257 /** Handles a RESET frame. RESET is always handled inline in the queue. */ |
|
258 private void receiveResetFrame(ResetFrame frame) { |
|
259 inputQ.add(frame); |
|
260 sched.runOrSchedule(); |
|
261 } |
|
262 |
|
263 // pushes entire response body into response subscriber |
|
264 // blocking when required by local or remote flow control |
|
265 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) { |
|
266 responseBodyCF = new MinimalFuture<>(); |
|
267 // We want to allow the subscriber's getBody() method to block so it |
|
268 // can work with InputStreams. So, we offload execution. |
|
269 executor.execute(() -> { |
|
270 bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { |
|
271 if (t == null) |
|
272 responseBodyCF.complete(body); |
|
273 else |
|
274 responseBodyCF.completeExceptionally(t); |
|
275 }); |
|
276 }); |
|
277 |
|
278 if (isCanceled()) { |
|
279 Throwable t = getCancelCause(); |
|
280 responseBodyCF.completeExceptionally(t); |
|
281 } else { |
|
282 bodySubscriber.onSubscribe(userSubscription); |
|
283 } |
|
284 // Set the responseSubscriber field now that onSubscribe has been called. |
|
285 // This effectively allows the scheduler to start invoking the callbacks. |
|
286 responseSubscriber = bodySubscriber; |
|
287 sched.runOrSchedule(); // in case data waiting already to be processed |
|
288 return responseBodyCF; |
|
289 } |
|
290 |
|
291 @Override |
|
292 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
|
293 return sendBodyImpl().thenApply( v -> this); |
|
294 } |
|
295 |
|
296 @SuppressWarnings("unchecked") |
|
297 Stream(Http2Connection connection, |
|
298 Exchange<T> e, |
|
299 WindowController windowController) |
|
300 { |
|
301 super(e); |
|
302 this.connection = connection; |
|
303 this.windowController = windowController; |
|
304 this.request = e.request(); |
|
305 this.requestPublisher = request.requestPublisher; // may be null |
|
306 responseHeaders = new HttpHeadersImpl(); |
|
307 rspHeadersConsumer = (name, value) -> { |
|
308 responseHeaders.addHeader(name.toString(), value.toString()); |
|
309 if (Log.headers() && Log.trace()) { |
|
310 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", |
|
311 streamid, name, value); |
|
312 } |
|
313 }; |
|
314 this.requestPseudoHeaders = new HttpHeadersImpl(); |
|
315 // NEW |
|
316 this.windowUpdater = new StreamWindowUpdateSender(connection); |
|
317 } |
|
318 |
|
319 /** |
|
320 * Entry point from Http2Connection reader thread. |
|
321 * |
|
322 * Data frames will be removed by response body thread. |
|
323 */ |
|
324 void incoming(Http2Frame frame) throws IOException { |
|
325 debug.log(Level.DEBUG, "incoming: %s", frame); |
|
326 if ((frame instanceof HeaderFrame)) { |
|
327 HeaderFrame hframe = (HeaderFrame)frame; |
|
328 if (hframe.endHeaders()) { |
|
329 Log.logTrace("handling response (streamid={0})", streamid); |
|
330 handleResponse(); |
|
331 if (hframe.getFlag(HeaderFrame.END_STREAM)) { |
|
332 receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); |
|
333 } |
|
334 } |
|
335 } else if (frame instanceof DataFrame) { |
|
336 receiveDataFrame((DataFrame)frame); |
|
337 } else { |
|
338 otherFrame(frame); |
|
339 } |
|
340 } |
|
341 |
|
342 void otherFrame(Http2Frame frame) throws IOException { |
|
343 switch (frame.type()) { |
|
344 case WindowUpdateFrame.TYPE: |
|
345 incoming_windowUpdate((WindowUpdateFrame) frame); |
|
346 break; |
|
347 case ResetFrame.TYPE: |
|
348 incoming_reset((ResetFrame) frame); |
|
349 break; |
|
350 case PriorityFrame.TYPE: |
|
351 incoming_priority((PriorityFrame) frame); |
|
352 break; |
|
353 default: |
|
354 String msg = "Unexpected frame: " + frame.toString(); |
|
355 throw new IOException(msg); |
|
356 } |
|
357 } |
|
358 |
|
359 // The Hpack decoder decodes into one of these consumers of name,value pairs |
|
360 |
|
361 DecodingCallback rspHeadersConsumer() { |
|
362 return rspHeadersConsumer; |
|
363 } |
|
364 |
|
365 protected void handleResponse() throws IOException { |
|
366 responseCode = (int)responseHeaders |
|
367 .firstValueAsLong(":status") |
|
368 .orElseThrow(() -> new IOException("no statuscode in response")); |
|
369 |
|
370 response = new Response( |
|
371 request, exchange, responseHeaders, |
|
372 responseCode, HttpClient.Version.HTTP_2); |
|
373 |
|
374 /* TODO: review if needs to be removed |
|
375 the value is not used, but in case `content-length` doesn't parse as |
|
376 long, there will be NumberFormatException. If left as is, make sure |
|
377 code up the stack handles NFE correctly. */ |
|
378 responseHeaders.firstValueAsLong("content-length"); |
|
379 |
|
380 if (Log.headers()) { |
|
381 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); |
|
382 Log.dumpHeaders(sb, " ", responseHeaders); |
|
383 Log.logHeaders(sb.toString()); |
|
384 } |
|
385 |
|
386 completeResponse(response); |
|
387 } |
|
388 |
|
389 void incoming_reset(ResetFrame frame) { |
|
390 Log.logTrace("Received RST_STREAM on stream {0}", streamid); |
|
391 if (endStreamReceived()) { |
|
392 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); |
|
393 } else if (closed) { |
|
394 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
|
395 } else { |
|
396 // put it in the input queue in order to read all |
|
397 // pending data frames first. Indeed, a server may send |
|
398 // RST_STREAM after sending END_STREAM, in which case we should |
|
399 // ignore it. However, we won't know if we have received END_STREAM |
|
400 // or not until all pending data frames are read. |
|
401 receiveResetFrame(frame); |
|
402 // RST_STREAM was pushed to the queue. It will be handled by |
|
403 // asyncReceive after all pending data frames have been |
|
404 // processed. |
|
405 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); |
|
406 } |
|
407 } |
|
408 |
|
409 void handleReset(ResetFrame frame) { |
|
410 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); |
|
411 if (!closed) { |
|
412 close(); |
|
413 int error = frame.getErrorCode(); |
|
414 completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); |
|
415 } else { |
|
416 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
|
417 } |
|
418 } |
|
419 |
|
420 void incoming_priority(PriorityFrame frame) { |
|
421 // TODO: implement priority |
|
422 throw new UnsupportedOperationException("Not implemented"); |
|
423 } |
|
424 |
|
425 private void incoming_windowUpdate(WindowUpdateFrame frame) |
|
426 throws IOException |
|
427 { |
|
428 int amount = frame.getUpdate(); |
|
429 if (amount <= 0) { |
|
430 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", |
|
431 streamid, streamid, amount); |
|
432 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); |
|
433 } else { |
|
434 assert streamid != 0; |
|
435 boolean success = windowController.increaseStreamWindow(amount, streamid); |
|
436 if (!success) { // overflow |
|
437 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); |
|
438 } |
|
439 } |
|
440 } |
|
441 |
|
442 void incoming_pushPromise(HttpRequestImpl pushRequest, |
|
443 PushedStream<T> pushStream) |
|
444 throws IOException |
|
445 { |
|
446 if (Log.requests()) { |
|
447 Log.logRequest("PUSH_PROMISE: " + pushRequest.toString()); |
|
448 } |
|
449 PushGroup<T> pushGroup = exchange.getPushGroup(); |
|
450 if (pushGroup == null) { |
|
451 Log.logTrace("Rejecting push promise stream " + streamid); |
|
452 connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); |
|
453 pushStream.close(); |
|
454 return; |
|
455 } |
|
456 |
|
457 PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest); |
|
458 |
|
459 if (!acceptor.accepted()) { |
|
460 // cancel / reject |
|
461 IOException ex = new IOException("Stream " + streamid + " cancelled by users handler"); |
|
462 if (Log.trace()) { |
|
463 Log.logTrace("No body subscriber for {0}: {1}", pushRequest, |
|
464 ex.getMessage()); |
|
465 } |
|
466 pushStream.cancelImpl(ex); |
|
467 return; |
|
468 } |
|
469 |
|
470 CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf(); |
|
471 HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler(); |
|
472 assert pushHandler != null; |
|
473 |
|
474 pushStream.requestSent(); |
|
475 pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ? |
|
476 // setup housekeeping for when the push is received |
|
477 // TODO: deal with ignoring of CF anti-pattern |
|
478 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); |
|
479 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { |
|
480 t = Utils.getCompletionCause(t); |
|
481 if (Log.trace()) { |
|
482 Log.logTrace("Push completed on stream {0} for {1}{2}", |
|
483 pushStream.streamid, resp, |
|
484 ((t==null) ? "": " with exception " + t)); |
|
485 } |
|
486 if (t != null) { |
|
487 pushGroup.pushError(t); |
|
488 pushResponseCF.completeExceptionally(t); |
|
489 } else { |
|
490 pushResponseCF.complete(resp); |
|
491 } |
|
492 pushGroup.pushCompleted(); |
|
493 }); |
|
494 |
|
495 } |
|
496 |
|
497 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { |
|
498 HttpHeadersImpl h = request.getSystemHeaders(); |
|
499 if (contentLength > 0) { |
|
500 h.setHeader("content-length", Long.toString(contentLength)); |
|
501 } |
|
502 setPseudoHeaderFields(); |
|
503 HttpHeaders sysh = filter(h); |
|
504 HttpHeaders userh = filter(request.getUserHeaders()); |
|
505 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this); |
|
506 if (contentLength == 0) { |
|
507 f.setFlag(HeadersFrame.END_STREAM); |
|
508 endStreamSent = true; |
|
509 } |
|
510 return f; |
|
511 } |
|
512 |
|
513 private boolean hasProxyAuthorization(HttpHeaders headers) { |
|
514 return headers.firstValue("proxy-authorization") |
|
515 .isPresent(); |
|
516 } |
|
517 |
|
518 // Determines whether we need to build a new HttpHeader object. |
|
519 // |
|
520 // Ideally we should pass the filter to OutgoingHeaders refactor the |
|
521 // code that creates the HeaderFrame to honor the filter. |
|
522 // We're not there yet - so depending on the filter we need to |
|
523 // apply and the content of the header we will try to determine |
|
524 // whether anything might need to be filtered. |
|
525 // If nothing needs filtering then we can just use the |
|
526 // original headers. |
|
527 private boolean needsFiltering(HttpHeaders headers, |
|
528 BiPredicate<String, List<String>> filter) { |
|
529 if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) { |
|
530 // we're either connecting or proxying |
|
531 // slight optimization: we only need to filter out |
|
532 // disabled schemes, so if there are none just |
|
533 // pass through. |
|
534 return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER) |
|
535 && hasProxyAuthorization(headers); |
|
536 } else { |
|
537 // we're talking to a server, either directly or through |
|
538 // a tunnel. |
|
539 // Slight optimization: we only need to filter out |
|
540 // proxy authorization headers, so if there are none just |
|
541 // pass through. |
|
542 return hasProxyAuthorization(headers); |
|
543 } |
|
544 } |
|
545 |
|
546 private HttpHeaders filter(HttpHeaders headers) { |
|
547 HttpConnection conn = connection(); |
|
548 BiPredicate<String, List<String>> filter = |
|
549 conn.headerFilter(request); |
|
550 if (needsFiltering(headers, filter)) { |
|
551 return ImmutableHeaders.of(headers.map(), filter); |
|
552 } |
|
553 return headers; |
|
554 } |
|
555 |
|
556 private void setPseudoHeaderFields() { |
|
557 HttpHeadersImpl hdrs = requestPseudoHeaders; |
|
558 String method = request.method(); |
|
559 hdrs.setHeader(":method", method); |
|
560 URI uri = request.uri(); |
|
561 hdrs.setHeader(":scheme", uri.getScheme()); |
|
562 // TODO: userinfo deprecated. Needs to be removed |
|
563 hdrs.setHeader(":authority", uri.getAuthority()); |
|
564 // TODO: ensure header names beginning with : not in user headers |
|
565 String query = uri.getQuery(); |
|
566 String path = uri.getPath(); |
|
567 if (path == null || path.isEmpty()) { |
|
568 if (method.equalsIgnoreCase("OPTIONS")) { |
|
569 path = "*"; |
|
570 } else { |
|
571 path = "/"; |
|
572 } |
|
573 } |
|
574 if (query != null) { |
|
575 path += "?" + query; |
|
576 } |
|
577 hdrs.setHeader(":path", path); |
|
578 } |
|
579 |
|
580 HttpHeadersImpl getRequestPseudoHeaders() { |
|
581 return requestPseudoHeaders; |
|
582 } |
|
583 |
|
584 /** Sets endStreamReceived. Should be called only once. */ |
|
585 void setEndStreamReceived() { |
|
586 assert remotelyClosed == false: "Unexpected endStream already set"; |
|
587 remotelyClosed = true; |
|
588 responseReceived(); |
|
589 } |
|
590 |
|
591 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame |
|
592 * received on this stream. */ |
|
593 private boolean endStreamReceived() { |
|
594 return remotelyClosed; |
|
595 } |
|
596 |
|
597 @Override |
|
598 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
|
599 debug.log(Level.DEBUG, "sendHeadersOnly()"); |
|
600 if (Log.requests() && request != null) { |
|
601 Log.logRequest(request.toString()); |
|
602 } |
|
603 if (requestPublisher != null) { |
|
604 requestContentLen = requestPublisher.contentLength(); |
|
605 } else { |
|
606 requestContentLen = 0; |
|
607 } |
|
608 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); |
|
609 connection.sendFrame(f); |
|
610 CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>(); |
|
611 cf.complete(this); // #### good enough for now |
|
612 return cf; |
|
613 } |
|
614 |
|
615 @Override |
|
616 void released() { |
|
617 if (streamid > 0) { |
|
618 debug.log(Level.DEBUG, "Released stream %d", streamid); |
|
619 // remove this stream from the Http2Connection map. |
|
620 connection.closeStream(streamid); |
|
621 } else { |
|
622 debug.log(Level.DEBUG, "Can't release stream %d", streamid); |
|
623 } |
|
624 } |
|
625 |
|
626 @Override |
|
627 void completed() { |
|
628 // There should be nothing to do here: the stream should have |
|
629 // been already closed (or will be closed shortly after). |
|
630 } |
|
631 |
|
632 void registerStream(int id) { |
|
633 this.streamid = id; |
|
634 connection.putStream(this, streamid); |
|
635 debug.log(Level.DEBUG, "Registered stream %d", id); |
|
636 } |
|
637 |
|
638 void signalWindowUpdate() { |
|
639 RequestSubscriber subscriber = requestSubscriber; |
|
640 assert subscriber != null; |
|
641 debug.log(Level.DEBUG, "Signalling window update"); |
|
642 subscriber.sendScheduler.runOrSchedule(); |
|
643 } |
|
644 |
|
645 static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); |
|
646 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { |
|
647 // can be < 0 if the actual length is not known. |
|
648 private final long contentLength; |
|
649 private volatile long remainingContentLength; |
|
650 private volatile Subscription subscription; |
|
651 |
|
652 // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. |
|
653 // 1) The data that was published by the request body Publisher, and |
|
654 // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. |
|
655 final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>(); |
|
656 |
|
657 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
658 // A scheduler used to honor window updates. Writing must be paused |
|
659 // when the window is exhausted, and resumed when the window acquires |
|
660 // some space. The sendScheduler makes it possible to implement this |
|
661 // behaviour in an asynchronous non-blocking way. |
|
662 // See RequestSubscriber::trySend below. |
|
663 final SequentialScheduler sendScheduler; |
|
664 |
|
665 RequestSubscriber(long contentLen) { |
|
666 this.contentLength = contentLen; |
|
667 this.remainingContentLength = contentLen; |
|
668 this.sendScheduler = |
|
669 SequentialScheduler.synchronizedScheduler(this::trySend); |
|
670 } |
|
671 |
|
672 @Override |
|
673 public void onSubscribe(Flow.Subscription subscription) { |
|
674 if (this.subscription != null) { |
|
675 throw new IllegalStateException("already subscribed"); |
|
676 } |
|
677 this.subscription = subscription; |
|
678 debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1"); |
|
679 subscription.request(1); |
|
680 } |
|
681 |
|
682 @Override |
|
683 public void onNext(ByteBuffer item) { |
|
684 debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining()); |
|
685 int size = outgoing.size(); |
|
686 assert size == 0 : "non-zero size: " + size; |
|
687 onNextImpl(item); |
|
688 } |
|
689 |
|
690 private void onNextImpl(ByteBuffer item) { |
|
691 // Got some more request body bytes to send. |
|
692 if (requestBodyCF.isDone()) { |
|
693 // stream already cancelled, probably in timeout |
|
694 sendScheduler.stop(); |
|
695 subscription.cancel(); |
|
696 return; |
|
697 } |
|
698 outgoing.add(item); |
|
699 sendScheduler.runOrSchedule(); |
|
700 } |
|
701 |
|
702 @Override |
|
703 public void onError(Throwable throwable) { |
|
704 debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable); |
|
705 // ensure that errors are handled within the flow. |
|
706 if (errorRef.compareAndSet(null, throwable)) { |
|
707 sendScheduler.runOrSchedule(); |
|
708 } |
|
709 } |
|
710 |
|
711 @Override |
|
712 public void onComplete() { |
|
713 debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); |
|
714 int size = outgoing.size(); |
|
715 assert size == 0 || size == 1 : "non-zero or one size: " + size; |
|
716 // last byte of request body has been obtained. |
|
717 // ensure that everything is completed within the flow. |
|
718 onNextImpl(COMPLETED); |
|
719 } |
|
720 |
|
721 // Attempts to send the data, if any. |
|
722 // Handles errors and completion state. |
|
723 // Pause writing if the send window is exhausted, resume it if the |
|
724 // send window has some bytes that can be acquired. |
|
725 void trySend() { |
|
726 try { |
|
727 // handle errors raised by onError; |
|
728 Throwable t = errorRef.get(); |
|
729 if (t != null) { |
|
730 sendScheduler.stop(); |
|
731 if (requestBodyCF.isDone()) return; |
|
732 subscription.cancel(); |
|
733 requestBodyCF.completeExceptionally(t); |
|
734 return; |
|
735 } |
|
736 |
|
737 do { |
|
738 // handle COMPLETED; |
|
739 ByteBuffer item = outgoing.peekFirst(); |
|
740 if (item == null) return; |
|
741 else if (item == COMPLETED) { |
|
742 sendScheduler.stop(); |
|
743 complete(); |
|
744 return; |
|
745 } |
|
746 |
|
747 // handle bytes to send downstream |
|
748 while (item.hasRemaining()) { |
|
749 debug.log(Level.DEBUG, "trySend: %d", item.remaining()); |
|
750 assert !endStreamSent : "internal error, send data after END_STREAM flag"; |
|
751 DataFrame df = getDataFrame(item); |
|
752 if (df == null) { |
|
753 debug.log(Level.DEBUG, "trySend: can't send yet: %d", |
|
754 item.remaining()); |
|
755 return; // the send window is exhausted: come back later |
|
756 } |
|
757 |
|
758 if (contentLength > 0) { |
|
759 remainingContentLength -= df.getDataLength(); |
|
760 if (remainingContentLength < 0) { |
|
761 String msg = connection().getConnectionFlow() |
|
762 + " stream=" + streamid + " " |
|
763 + "[" + Thread.currentThread().getName() + "] " |
|
764 + "Too many bytes in request body. Expected: " |
|
765 + contentLength + ", got: " |
|
766 + (contentLength - remainingContentLength); |
|
767 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
768 throw new IOException(msg); |
|
769 } else if (remainingContentLength == 0) { |
|
770 df.setFlag(DataFrame.END_STREAM); |
|
771 endStreamSent = true; |
|
772 } |
|
773 } |
|
774 debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); |
|
775 connection.sendDataFrame(df); |
|
776 } |
|
777 assert !item.hasRemaining(); |
|
778 ByteBuffer b = outgoing.removeFirst(); |
|
779 assert b == item; |
|
780 } while (outgoing.peekFirst() != null); |
|
781 |
|
782 debug.log(Level.DEBUG, "trySend: request 1"); |
|
783 subscription.request(1); |
|
784 } catch (Throwable ex) { |
|
785 debug.log(Level.DEBUG, "trySend: ", ex); |
|
786 sendScheduler.stop(); |
|
787 subscription.cancel(); |
|
788 requestBodyCF.completeExceptionally(ex); |
|
789 } |
|
790 } |
|
791 |
|
792 private void complete() throws IOException { |
|
793 long remaining = remainingContentLength; |
|
794 long written = contentLength - remaining; |
|
795 if (remaining > 0) { |
|
796 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
797 // let trySend() handle the exception |
|
798 throw new IOException(connection().getConnectionFlow() |
|
799 + " stream=" + streamid + " " |
|
800 + "[" + Thread.currentThread().getName() +"] " |
|
801 + "Too few bytes returned by the publisher (" |
|
802 + written + "/" |
|
803 + contentLength + ")"); |
|
804 } |
|
805 if (!endStreamSent) { |
|
806 endStreamSent = true; |
|
807 connection.sendDataFrame(getEmptyEndStreamDataFrame()); |
|
808 } |
|
809 requestBodyCF.complete(null); |
|
810 } |
|
811 } |
|
812 |
|
813 /** |
|
814 * Send a RESET frame to tell server to stop sending data on this stream |
|
815 */ |
|
816 @Override |
|
817 public CompletableFuture<Void> ignoreBody() { |
|
818 try { |
|
819 connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); |
|
820 return MinimalFuture.completedFuture(null); |
|
821 } catch (Throwable e) { |
|
822 Log.logTrace("Error resetting stream {0}", e.toString()); |
|
823 return MinimalFuture.failedFuture(e); |
|
824 } |
|
825 } |
|
826 |
|
827 DataFrame getDataFrame(ByteBuffer buffer) { |
|
828 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); |
|
829 // blocks waiting for stream send window, if exhausted |
|
830 int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); |
|
831 if (actualAmount <= 0) return null; |
|
832 ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount); |
|
833 DataFrame df = new DataFrame(streamid, 0 , outBuf); |
|
834 return df; |
|
835 } |
|
836 |
|
837 private DataFrame getEmptyEndStreamDataFrame() { |
|
838 return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); |
|
839 } |
|
840 |
|
841 /** |
|
842 * A List of responses relating to this stream. Normally there is only |
|
843 * one response, but intermediate responses like 100 are allowed |
|
844 * and must be passed up to higher level before continuing. Deals with races |
|
845 * such as if responses are returned before the CFs get created by |
|
846 * getResponseAsync() |
|
847 */ |
|
848 |
|
849 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); |
|
850 |
|
851 @Override |
|
852 CompletableFuture<Response> getResponseAsync(Executor executor) { |
|
853 CompletableFuture<Response> cf; |
|
854 // The code below deals with race condition that can be caused when |
|
855 // completeResponse() is being called before getResponseAsync() |
|
856 synchronized (response_cfs) { |
|
857 if (!response_cfs.isEmpty()) { |
|
858 // This CompletableFuture was created by completeResponse(). |
|
859 // it will be already completed. |
|
860 cf = response_cfs.remove(0); |
|
861 // if we find a cf here it should be already completed. |
|
862 // finding a non completed cf should not happen. just assert it. |
|
863 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; |
|
864 } else { |
|
865 // getResponseAsync() is called first. Create a CompletableFuture |
|
866 // that will be completed by completeResponse() when |
|
867 // completeResponse() is called. |
|
868 cf = new MinimalFuture<>(); |
|
869 response_cfs.add(cf); |
|
870 } |
|
871 } |
|
872 if (executor != null && !cf.isDone()) { |
|
873 // protect from executing later chain of CompletableFuture operations from SelectorManager thread |
|
874 cf = cf.thenApplyAsync(r -> r, executor); |
|
875 } |
|
876 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); |
|
877 PushGroup<?> pg = exchange.getPushGroup(); |
|
878 if (pg != null) { |
|
879 // if an error occurs make sure it is recorded in the PushGroup |
|
880 cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); |
|
881 } |
|
882 return cf; |
|
883 } |
|
884 |
|
885 /** |
|
886 * Completes the first uncompleted CF on list, and removes it. If there is no |
|
887 * uncompleted CF then creates one (completes it) and adds to list |
|
888 */ |
|
889 void completeResponse(Response resp) { |
|
890 synchronized (response_cfs) { |
|
891 CompletableFuture<Response> cf; |
|
892 int cfs_len = response_cfs.size(); |
|
893 for (int i=0; i<cfs_len; i++) { |
|
894 cf = response_cfs.get(i); |
|
895 if (!cf.isDone()) { |
|
896 Log.logTrace("Completing response (streamid={0}): {1}", |
|
897 streamid, cf); |
|
898 cf.complete(resp); |
|
899 response_cfs.remove(cf); |
|
900 return; |
|
901 } // else we found the previous response: just leave it alone. |
|
902 } |
|
903 cf = MinimalFuture.completedFuture(resp); |
|
904 Log.logTrace("Created completed future (streamid={0}): {1}", |
|
905 streamid, cf); |
|
906 response_cfs.add(cf); |
|
907 } |
|
908 } |
|
909 |
|
910 // methods to update state and remove stream when finished |
|
911 |
|
912 synchronized void requestSent() { |
|
913 requestSent = true; |
|
914 if (responseReceived) { |
|
915 close(); |
|
916 } |
|
917 } |
|
918 |
|
919 synchronized void responseReceived() { |
|
920 responseReceived = true; |
|
921 if (requestSent) { |
|
922 close(); |
|
923 } |
|
924 } |
|
925 |
|
926 /** |
|
927 * same as above but for errors |
|
928 */ |
|
929 void completeResponseExceptionally(Throwable t) { |
|
930 synchronized (response_cfs) { |
|
931 // use index to avoid ConcurrentModificationException |
|
932 // caused by removing the CF from within the loop. |
|
933 for (int i = 0; i < response_cfs.size(); i++) { |
|
934 CompletableFuture<Response> cf = response_cfs.get(i); |
|
935 if (!cf.isDone()) { |
|
936 cf.completeExceptionally(t); |
|
937 response_cfs.remove(i); |
|
938 return; |
|
939 } |
|
940 } |
|
941 response_cfs.add(MinimalFuture.failedFuture(t)); |
|
942 } |
|
943 } |
|
944 |
|
945 CompletableFuture<Void> sendBodyImpl() { |
|
946 requestBodyCF.whenComplete((v, t) -> requestSent()); |
|
947 if (requestPublisher != null) { |
|
948 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); |
|
949 requestPublisher.subscribe(requestSubscriber = subscriber); |
|
950 } else { |
|
951 // there is no request body, therefore the request is complete, |
|
952 // END_STREAM has already sent with outgoing headers |
|
953 requestBodyCF.complete(null); |
|
954 } |
|
955 return requestBodyCF; |
|
956 } |
|
957 |
|
958 @Override |
|
959 void cancel() { |
|
960 cancel(new IOException("Stream " + streamid + " cancelled")); |
|
961 } |
|
962 |
|
963 @Override |
|
964 void cancel(IOException cause) { |
|
965 cancelImpl(cause); |
|
966 } |
|
967 |
|
968 // This method sends a RST_STREAM frame |
|
969 void cancelImpl(Throwable e) { |
|
970 debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e); |
|
971 if (Log.trace()) { |
|
972 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); |
|
973 } |
|
974 boolean closing; |
|
975 if (closing = !closed) { // assigning closing to !closed |
|
976 synchronized (this) { |
|
977 failed = e; |
|
978 if (closing = !closed) { // assigning closing to !closed |
|
979 closed=true; |
|
980 } |
|
981 } |
|
982 } |
|
983 if (closing) { // true if the stream has not been closed yet |
|
984 if (responseSubscriber != null) |
|
985 sched.runOrSchedule(); |
|
986 } |
|
987 completeResponseExceptionally(e); |
|
988 if (!requestBodyCF.isDone()) { |
|
989 requestBodyCF.completeExceptionally(e); // we may be sending the body.. |
|
990 } |
|
991 if (responseBodyCF != null) { |
|
992 responseBodyCF.completeExceptionally(e); |
|
993 } |
|
994 try { |
|
995 // will send a RST_STREAM frame |
|
996 if (streamid != 0) { |
|
997 connection.resetStream(streamid, ResetFrame.CANCEL); |
|
998 } |
|
999 } catch (IOException ex) { |
|
1000 Log.logError(ex); |
|
1001 } |
|
1002 } |
|
1003 |
|
1004 // This method doesn't send any frame |
|
1005 void close() { |
|
1006 if (closed) return; |
|
1007 synchronized(this) { |
|
1008 if (closed) return; |
|
1009 closed = true; |
|
1010 } |
|
1011 Log.logTrace("Closing stream {0}", streamid); |
|
1012 connection.closeStream(streamid); |
|
1013 Log.logTrace("Stream {0} closed", streamid); |
|
1014 } |
|
1015 |
|
1016 static class PushedStream<T> extends Stream<T> { |
|
1017 final PushGroup<T> pushGroup; |
|
1018 // push streams need the response CF allocated up front as it is |
|
1019 // given directly to user via the multi handler callback function. |
|
1020 final CompletableFuture<Response> pushCF; |
|
1021 CompletableFuture<HttpResponse<T>> responseCF; |
|
1022 final HttpRequestImpl pushReq; |
|
1023 HttpResponse.BodyHandler<T> pushHandler; |
|
1024 |
|
1025 PushedStream(PushGroup<T> pushGroup, |
|
1026 Http2Connection connection, |
|
1027 Exchange<T> pushReq) { |
|
1028 // ## no request body possible, null window controller |
|
1029 super(connection, pushReq, null); |
|
1030 this.pushGroup = pushGroup; |
|
1031 this.pushReq = pushReq.request(); |
|
1032 this.pushCF = new MinimalFuture<>(); |
|
1033 this.responseCF = new MinimalFuture<>(); |
|
1034 |
|
1035 } |
|
1036 |
|
1037 CompletableFuture<HttpResponse<T>> responseCF() { |
|
1038 return responseCF; |
|
1039 } |
|
1040 |
|
1041 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { |
|
1042 this.pushHandler = pushHandler; |
|
1043 } |
|
1044 |
|
1045 synchronized HttpResponse.BodyHandler<T> getPushHandler() { |
|
1046 // ignored parameters to function can be used as BodyHandler |
|
1047 return this.pushHandler; |
|
1048 } |
|
1049 |
|
1050 // Following methods call the super class but in case of |
|
1051 // error record it in the PushGroup. The error method is called |
|
1052 // with a null value when no error occurred (is a no-op) |
|
1053 @Override |
|
1054 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
|
1055 return super.sendBodyAsync() |
|
1056 .whenComplete((ExchangeImpl<T> v, Throwable t) |
|
1057 -> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
1058 } |
|
1059 |
|
1060 @Override |
|
1061 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
|
1062 return super.sendHeadersAsync() |
|
1063 .whenComplete((ExchangeImpl<T> ex, Throwable t) |
|
1064 -> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
1065 } |
|
1066 |
|
1067 @Override |
|
1068 CompletableFuture<Response> getResponseAsync(Executor executor) { |
|
1069 CompletableFuture<Response> cf = pushCF.whenComplete( |
|
1070 (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
1071 if(executor!=null && !cf.isDone()) { |
|
1072 cf = cf.thenApplyAsync( r -> r, executor); |
|
1073 } |
|
1074 return cf; |
|
1075 } |
|
1076 |
|
1077 @Override |
|
1078 CompletableFuture<T> readBodyAsync( |
|
1079 HttpResponse.BodyHandler<T> handler, |
|
1080 boolean returnConnectionToPool, |
|
1081 Executor executor) |
|
1082 { |
|
1083 return super.readBodyAsync(handler, returnConnectionToPool, executor) |
|
1084 .whenComplete((v, t) -> pushGroup.pushError(t)); |
|
1085 } |
|
1086 |
|
1087 @Override |
|
1088 void completeResponse(Response r) { |
|
1089 Log.logResponse(r::toString); |
|
1090 pushCF.complete(r); // not strictly required for push API |
|
1091 // start reading the body using the obtained BodySubscriber |
|
1092 CompletableFuture<Void> start = new MinimalFuture<>(); |
|
1093 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) |
|
1094 .whenComplete((T body, Throwable t) -> { |
|
1095 if (t != null) { |
|
1096 responseCF.completeExceptionally(t); |
|
1097 } else { |
|
1098 HttpResponseImpl<T> resp = |
|
1099 new HttpResponseImpl<>(r.request, r, null, body, getExchange()); |
|
1100 responseCF.complete(resp); |
|
1101 } |
|
1102 }); |
|
1103 start.completeAsync(() -> null, getExchange().executor()); |
|
1104 } |
|
1105 |
|
1106 @Override |
|
1107 void completeResponseExceptionally(Throwable t) { |
|
1108 pushCF.completeExceptionally(t); |
|
1109 } |
|
1110 |
|
1111 // @Override |
|
1112 // synchronized void responseReceived() { |
|
1113 // super.responseReceived(); |
|
1114 // } |
|
1115 |
|
1116 // create and return the PushResponseImpl |
|
1117 @Override |
|
1118 protected void handleResponse() { |
|
1119 responseCode = (int)responseHeaders |
|
1120 .firstValueAsLong(":status") |
|
1121 .orElse(-1); |
|
1122 |
|
1123 if (responseCode == -1) { |
|
1124 completeResponseExceptionally(new IOException("No status code")); |
|
1125 } |
|
1126 |
|
1127 this.response = new Response( |
|
1128 pushReq, exchange, responseHeaders, |
|
1129 responseCode, HttpClient.Version.HTTP_2); |
|
1130 |
|
1131 /* TODO: review if needs to be removed |
|
1132 the value is not used, but in case `content-length` doesn't parse |
|
1133 as long, there will be NumberFormatException. If left as is, make |
|
1134 sure code up the stack handles NFE correctly. */ |
|
1135 responseHeaders.firstValueAsLong("content-length"); |
|
1136 |
|
1137 if (Log.headers()) { |
|
1138 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); |
|
1139 sb.append(" (streamid=").append(streamid).append("): "); |
|
1140 Log.dumpHeaders(sb, " ", responseHeaders); |
|
1141 Log.logHeaders(sb.toString()); |
|
1142 } |
|
1143 |
|
1144 // different implementations for normal streams and pushed streams |
|
1145 completeResponse(response); |
|
1146 } |
|
1147 } |
|
1148 |
|
1149 final class StreamWindowUpdateSender extends WindowUpdateSender { |
|
1150 |
|
1151 StreamWindowUpdateSender(Http2Connection connection) { |
|
1152 super(connection); |
|
1153 } |
|
1154 |
|
1155 @Override |
|
1156 int getStreamId() { |
|
1157 return streamid; |
|
1158 } |
|
1159 } |
|
1160 |
|
1161 /** |
|
1162 * Returns true if this exchange was canceled. |
|
1163 * @return true if this exchange was canceled. |
|
1164 */ |
|
1165 synchronized boolean isCanceled() { |
|
1166 return failed != null; |
|
1167 } |
|
1168 |
|
1169 /** |
|
1170 * Returns the cause for which this exchange was canceled, if available. |
|
1171 * @return the cause for which this exchange was canceled, if available. |
|
1172 */ |
|
1173 synchronized Throwable getCancelCause() { |
|
1174 return failed; |
|
1175 } |
|
1176 |
|
1177 final String dbgString() { |
|
1178 return connection.dbgString() + "/Stream("+streamid+")"; |
|
1179 } |
|
1180 } |