|
1 /* |
|
2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package java.net.http.internal; |
|
27 |
|
28 import java.io.EOFException; |
|
29 import java.io.IOException; |
|
30 import java.lang.System.Logger.Level; |
|
31 import java.net.InetSocketAddress; |
|
32 import java.net.URI; |
|
33 import java.nio.ByteBuffer; |
|
34 import java.nio.charset.StandardCharsets; |
|
35 import java.util.Iterator; |
|
36 import java.util.LinkedList; |
|
37 import java.util.List; |
|
38 import java.util.Map; |
|
39 import java.util.concurrent.CompletableFuture; |
|
40 import java.util.ArrayList; |
|
41 import java.util.Objects; |
|
42 import java.util.concurrent.ConcurrentHashMap; |
|
43 import java.util.concurrent.ConcurrentLinkedQueue; |
|
44 import java.util.concurrent.Flow; |
|
45 import java.util.function.Function; |
|
46 import java.util.function.Supplier; |
|
47 import javax.net.ssl.SSLEngine; |
|
48 import javax.net.ssl.SSLException; |
|
49 import java.net.http.HttpClient; |
|
50 import java.net.http.HttpHeaders; |
|
51 import java.net.http.internal.HttpConnection.HttpPublisher; |
|
52 import java.net.http.internal.common.FlowTube; |
|
53 import java.net.http.internal.common.FlowTube.TubeSubscriber; |
|
54 import java.net.http.internal.common.HttpHeadersImpl; |
|
55 import java.net.http.internal.common.Log; |
|
56 import java.net.http.internal.common.MinimalFuture; |
|
57 import java.net.http.internal.common.SequentialScheduler; |
|
58 import java.net.http.internal.common.Utils; |
|
59 import java.net.http.internal.frame.ContinuationFrame; |
|
60 import java.net.http.internal.frame.DataFrame; |
|
61 import java.net.http.internal.frame.ErrorFrame; |
|
62 import java.net.http.internal.frame.FramesDecoder; |
|
63 import java.net.http.internal.frame.FramesEncoder; |
|
64 import java.net.http.internal.frame.GoAwayFrame; |
|
65 import java.net.http.internal.frame.HeaderFrame; |
|
66 import java.net.http.internal.frame.HeadersFrame; |
|
67 import java.net.http.internal.frame.Http2Frame; |
|
68 import java.net.http.internal.frame.MalformedFrame; |
|
69 import java.net.http.internal.frame.OutgoingHeaders; |
|
70 import java.net.http.internal.frame.PingFrame; |
|
71 import java.net.http.internal.frame.PushPromiseFrame; |
|
72 import java.net.http.internal.frame.ResetFrame; |
|
73 import java.net.http.internal.frame.SettingsFrame; |
|
74 import java.net.http.internal.frame.WindowUpdateFrame; |
|
75 import java.net.http.internal.hpack.Encoder; |
|
76 import java.net.http.internal.hpack.Decoder; |
|
77 import java.net.http.internal.hpack.DecodingCallback; |
|
78 import static java.nio.charset.StandardCharsets.UTF_8; |
|
79 import static java.net.http.internal.frame.SettingsFrame.*; |
|
80 |
|
81 |
|
82 /** |
|
83 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used |
|
84 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. |
|
85 * |
|
86 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs |
|
87 * to a HttpClientImpl. |
|
88 * |
|
89 * Creation cases: |
|
90 * 1) upgraded HTTP/1.1 plain tcp connection |
|
91 * 2) prior knowledge directly created plain tcp connection |
|
92 * 3) directly created HTTP/2 SSL connection which uses ALPN. |
|
93 * |
|
94 * Sending is done by writing directly to underlying HttpConnection object which |
|
95 * is operating in async mode. No flow control applies on output at this level |
|
96 * and all writes are just executed as puts to an output Q belonging to HttpConnection |
|
97 * Flow control is implemented by HTTP/2 protocol itself. |
|
98 * |
|
99 * Hpack header compression |
|
100 * and outgoing stream creation is also done here, because these operations |
|
101 * must be synchronized at the socket level. Stream objects send frames simply |
|
102 * by placing them on the connection's output Queue. sendFrame() is called |
|
103 * from a higher level (Stream) thread. |
|
104 * |
|
105 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles |
|
106 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() |
|
107 * or handles them directly itself. This thread performs hpack decompression |
|
108 * and incoming stream creation (Server push). Incoming frames destined for a |
|
109 * stream are provided by calling Stream.incoming(). |
|
110 */ |
|
111 class Http2Connection { |
|
112 |
|
113 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
114 static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag. |
|
115 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
|
116 final static System.Logger DEBUG_LOGGER = |
|
117 Utils.getDebugLogger("Http2Connection"::toString, DEBUG); |
|
118 private final System.Logger debugHpack = |
|
119 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); |
|
120 static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); |
|
121 |
|
122 private boolean singleStream; // used only for stream 1, then closed |
|
123 |
|
124 /* |
|
125 * ByteBuffer pooling strategy for HTTP/2 protocol: |
|
126 * |
|
127 * In general there are 4 points where ByteBuffers are used: |
|
128 * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data |
|
129 * in case of SSL connection. |
|
130 * |
|
131 * 1. Outgoing frames encoded to ByteBuffers. |
|
132 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) |
|
133 * At this place no pools at all. All outgoing buffers should be collected by GC. |
|
134 * |
|
135 * 2. Incoming ByteBuffers (decoded to frames). |
|
136 * Here, total elimination of BB pool is not a good idea. |
|
137 * We don't know how many bytes we will receive through network. |
|
138 * So here we allocate buffer of reasonable size. The following life of the BB: |
|
139 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) |
|
140 * BB is returned to pool, |
|
141 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. |
|
142 * Such BB is never returned to pool and will be GCed. |
|
143 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and |
|
144 * the buffer could be release to pool. |
|
145 * |
|
146 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, |
|
147 * because of we can't predict size encrypted packets. |
|
148 * |
|
149 */ |
|
150 |
|
151 |
|
152 // A small class that allows to control frames with respect to the state of |
|
153 // the connection preface. Any data received before the connection |
|
154 // preface is sent will be buffered. |
|
155 private final class FramesController { |
|
156 volatile boolean prefaceSent; |
|
157 volatile List<ByteBuffer> pending; |
|
158 |
|
159 boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) |
|
160 throws IOException |
|
161 { |
|
162 // if preface is not sent, buffers data in the pending list |
|
163 if (!prefaceSent) { |
|
164 debug.log(Level.DEBUG, "Preface is not sent: buffering %d", |
|
165 buf.remaining()); |
|
166 synchronized (this) { |
|
167 if (!prefaceSent) { |
|
168 if (pending == null) pending = new ArrayList<>(); |
|
169 pending.add(buf); |
|
170 debug.log(Level.DEBUG, () -> "there are now " |
|
171 + Utils.remaining(pending) |
|
172 + " bytes buffered waiting for preface to be sent"); |
|
173 return false; |
|
174 } |
|
175 } |
|
176 } |
|
177 |
|
178 // Preface is sent. Checks for pending data and flush it. |
|
179 // We rely on this method being called from within the Http2TubeSubscriber |
|
180 // scheduler, so we know that no other thread could execute this method |
|
181 // concurrently while we're here. |
|
182 // This ensures that later incoming buffers will not |
|
183 // be processed before we have flushed the pending queue. |
|
184 // No additional synchronization is therefore necessary here. |
|
185 List<ByteBuffer> pending = this.pending; |
|
186 this.pending = null; |
|
187 if (pending != null) { |
|
188 // flush pending data |
|
189 debug.log(Level.DEBUG, () -> "Processing buffered data: " |
|
190 + Utils.remaining(pending)); |
|
191 for (ByteBuffer b : pending) { |
|
192 decoder.decode(b); |
|
193 } |
|
194 } |
|
195 // push the received buffer to the frames decoder. |
|
196 if (buf != EMPTY_TRIGGER) { |
|
197 debug.log(Level.DEBUG, "Processing %d", buf.remaining()); |
|
198 decoder.decode(buf); |
|
199 } |
|
200 return true; |
|
201 } |
|
202 |
|
203 // Mark that the connection preface is sent |
|
204 void markPrefaceSent() { |
|
205 assert !prefaceSent; |
|
206 synchronized (this) { |
|
207 prefaceSent = true; |
|
208 } |
|
209 } |
|
210 } |
|
211 |
|
212 volatile boolean closed; |
|
213 |
|
214 //------------------------------------- |
|
215 final HttpConnection connection; |
|
216 private final Http2ClientImpl client2; |
|
217 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); |
|
218 private int nextstreamid; |
|
219 private int nextPushStream = 2; |
|
220 private final Encoder hpackOut; |
|
221 private final Decoder hpackIn; |
|
222 final SettingsFrame clientSettings; |
|
223 private volatile SettingsFrame serverSettings; |
|
224 private final String key; // for HttpClientImpl.connections map |
|
225 private final FramesDecoder framesDecoder; |
|
226 private final FramesEncoder framesEncoder = new FramesEncoder(); |
|
227 |
|
228 /** |
|
229 * Send Window controller for both connection and stream windows. |
|
230 * Each of this connection's Streams MUST use this controller. |
|
231 */ |
|
232 private final WindowController windowController = new WindowController(); |
|
233 private final FramesController framesController = new FramesController(); |
|
234 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); |
|
235 final ConnectionWindowUpdateSender windowUpdater; |
|
236 private volatile Throwable cause; |
|
237 private volatile Supplier<ByteBuffer> initial; |
|
238 |
|
239 static final int DEFAULT_FRAME_SIZE = 16 * 1024; |
|
240 |
|
241 |
|
242 // TODO: need list of control frames from other threads |
|
243 // that need to be sent |
|
244 |
|
245 private Http2Connection(HttpConnection connection, |
|
246 Http2ClientImpl client2, |
|
247 int nextstreamid, |
|
248 String key) { |
|
249 this.connection = connection; |
|
250 this.client2 = client2; |
|
251 this.nextstreamid = nextstreamid; |
|
252 this.key = key; |
|
253 this.clientSettings = this.client2.getClientSettings(); |
|
254 this.framesDecoder = new FramesDecoder(this::processFrame, |
|
255 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); |
|
256 // serverSettings will be updated by server |
|
257 this.serverSettings = SettingsFrame.getDefaultSettings(); |
|
258 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); |
|
259 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); |
|
260 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); |
|
261 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); |
|
262 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); |
|
263 this.windowUpdater = new ConnectionWindowUpdateSender(this, |
|
264 client2.getConnectionWindowSize(clientSettings)); |
|
265 } |
|
266 |
|
267 /** |
|
268 * Case 1) Create from upgraded HTTP/1.1 connection. |
|
269 * Is ready to use. Can be SSL. exchange is the Exchange |
|
270 * that initiated the connection, whose response will be delivered |
|
271 * on a Stream. |
|
272 */ |
|
273 private Http2Connection(HttpConnection connection, |
|
274 Http2ClientImpl client2, |
|
275 Exchange<?> exchange, |
|
276 Supplier<ByteBuffer> initial) |
|
277 throws IOException, InterruptedException |
|
278 { |
|
279 this(connection, |
|
280 client2, |
|
281 3, // stream 1 is registered during the upgrade |
|
282 keyFor(connection)); |
|
283 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); |
|
284 |
|
285 Stream<?> initialStream = createStream(exchange); |
|
286 initialStream.registerStream(1); |
|
287 windowController.registerStream(1, getInitialSendWindowSize()); |
|
288 initialStream.requestSent(); |
|
289 // Upgrading: |
|
290 // set callbacks before sending preface - makes sure anything that |
|
291 // might be sent by the server will come our way. |
|
292 this.initial = initial; |
|
293 connectFlows(connection); |
|
294 sendConnectionPreface(); |
|
295 } |
|
296 |
|
297 // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving |
|
298 // agreement from the server. Async style but completes immediately, because |
|
299 // the connection is already connected. |
|
300 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, |
|
301 Http2ClientImpl client2, |
|
302 Exchange<?> exchange, |
|
303 Supplier<ByteBuffer> initial) |
|
304 { |
|
305 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); |
|
306 } |
|
307 |
|
308 // Requires TLS handshake. So, is really async |
|
309 static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request, |
|
310 Http2ClientImpl h2client) { |
|
311 assert request.secure(); |
|
312 AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) |
|
313 HttpConnection.getConnection(request.getAddress(), |
|
314 h2client.client(), |
|
315 request, |
|
316 HttpClient.Version.HTTP_2); |
|
317 |
|
318 return connection.connectAsync() |
|
319 .thenCompose(unused -> checkSSLConfig(connection)) |
|
320 .thenCompose(notused-> { |
|
321 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); |
|
322 try { |
|
323 Http2Connection hc = new Http2Connection(request, h2client, connection); |
|
324 cf.complete(hc); |
|
325 } catch (IOException e) { |
|
326 cf.completeExceptionally(e); |
|
327 } |
|
328 return cf; } ); |
|
329 } |
|
330 |
|
331 /** |
|
332 * Cases 2) 3) |
|
333 * |
|
334 * request is request to be sent. |
|
335 */ |
|
336 private Http2Connection(HttpRequestImpl request, |
|
337 Http2ClientImpl h2client, |
|
338 HttpConnection connection) |
|
339 throws IOException |
|
340 { |
|
341 this(connection, |
|
342 h2client, |
|
343 1, |
|
344 keyFor(request.uri(), request.proxy())); |
|
345 |
|
346 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); |
|
347 |
|
348 // safe to resume async reading now. |
|
349 connectFlows(connection); |
|
350 sendConnectionPreface(); |
|
351 } |
|
352 |
|
353 private void connectFlows(HttpConnection connection) { |
|
354 FlowTube tube = connection.getConnectionFlow(); |
|
355 // Connect the flow to our Http2TubeSubscriber: |
|
356 tube.connectFlows(connection.publisher(), subscriber); |
|
357 } |
|
358 |
|
359 final HttpClientImpl client() { |
|
360 return client2.client(); |
|
361 } |
|
362 |
|
363 /** |
|
364 * Throws an IOException if h2 was not negotiated |
|
365 */ |
|
366 private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) { |
|
367 assert aconn.isSecure(); |
|
368 |
|
369 Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> { |
|
370 CompletableFuture<Void> cf = new MinimalFuture<>(); |
|
371 SSLEngine engine = aconn.getEngine(); |
|
372 assert Objects.equals(alpn, engine.getApplicationProtocol()); |
|
373 |
|
374 DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn ); |
|
375 |
|
376 if (alpn == null || !alpn.equals("h2")) { |
|
377 String msg; |
|
378 if (alpn == null) { |
|
379 Log.logSSL("ALPN not supported"); |
|
380 msg = "ALPN not supported"; |
|
381 } else { |
|
382 switch (alpn) { |
|
383 case "": |
|
384 Log.logSSL(msg = "No ALPN negotiated"); |
|
385 break; |
|
386 case "http/1.1": |
|
387 Log.logSSL( msg = "HTTP/1.1 ALPN returned"); |
|
388 break; |
|
389 default: |
|
390 Log.logSSL(msg = "Unexpected ALPN: " + alpn); |
|
391 cf.completeExceptionally(new IOException(msg)); |
|
392 } |
|
393 } |
|
394 cf.completeExceptionally(new ALPNException(msg, aconn)); |
|
395 return cf; |
|
396 } |
|
397 cf.complete(null); |
|
398 return cf; |
|
399 }; |
|
400 |
|
401 return aconn.getALPN() |
|
402 .whenComplete((r,t) -> { |
|
403 if (t != null && t instanceof SSLException) { |
|
404 // something went wrong during the initial handshake |
|
405 // close the connection |
|
406 aconn.close(); |
|
407 } |
|
408 }) |
|
409 .thenCompose(checkAlpnCF); |
|
410 } |
|
411 |
|
412 synchronized boolean singleStream() { |
|
413 return singleStream; |
|
414 } |
|
415 |
|
416 synchronized void setSingleStream(boolean use) { |
|
417 singleStream = use; |
|
418 } |
|
419 |
|
420 static String keyFor(HttpConnection connection) { |
|
421 boolean isProxy = connection.isProxied(); |
|
422 boolean isSecure = connection.isSecure(); |
|
423 InetSocketAddress addr = connection.address(); |
|
424 |
|
425 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); |
|
426 } |
|
427 |
|
428 static String keyFor(URI uri, InetSocketAddress proxy) { |
|
429 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); |
|
430 boolean isProxy = proxy != null; |
|
431 |
|
432 String host; |
|
433 int port; |
|
434 |
|
435 if (proxy != null) { |
|
436 host = proxy.getHostString(); |
|
437 port = proxy.getPort(); |
|
438 } else { |
|
439 host = uri.getHost(); |
|
440 port = uri.getPort(); |
|
441 } |
|
442 return keyString(isSecure, isProxy, host, port); |
|
443 } |
|
444 |
|
445 // {C,S}:{H:P}:host:port |
|
446 // C indicates clear text connection "http" |
|
447 // S indicates secure "https" |
|
448 // H indicates host (direct) connection |
|
449 // P indicates proxy |
|
450 // Eg: "S:H:foo.com:80" |
|
451 static String keyString(boolean secure, boolean proxy, String host, int port) { |
|
452 if (secure && port == -1) |
|
453 port = 443; |
|
454 else if (!secure && port == -1) |
|
455 port = 80; |
|
456 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; |
|
457 } |
|
458 |
|
459 String key() { |
|
460 return this.key; |
|
461 } |
|
462 |
|
463 boolean offerConnection() { |
|
464 return client2.offerConnection(this); |
|
465 } |
|
466 |
|
467 private HttpPublisher publisher() { |
|
468 return connection.publisher(); |
|
469 } |
|
470 |
|
471 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) |
|
472 throws IOException |
|
473 { |
|
474 debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder); |
|
475 |
|
476 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); |
|
477 |
|
478 List<ByteBuffer> buffers = frame.getHeaderBlock(); |
|
479 int len = buffers.size(); |
|
480 for (int i = 0; i < len; i++) { |
|
481 ByteBuffer b = buffers.get(i); |
|
482 hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); |
|
483 } |
|
484 } |
|
485 |
|
486 final int getInitialSendWindowSize() { |
|
487 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); |
|
488 } |
|
489 |
|
490 void close() { |
|
491 Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); |
|
492 GoAwayFrame f = new GoAwayFrame(0, |
|
493 ErrorFrame.NO_ERROR, |
|
494 "Requested by user".getBytes(UTF_8)); |
|
495 // TODO: set last stream. For now zero ok. |
|
496 sendFrame(f); |
|
497 } |
|
498 |
|
499 long count; |
|
500 final void asyncReceive(ByteBuffer buffer) { |
|
501 // We don't need to read anything and |
|
502 // we don't want to send anything back to the server |
|
503 // until the connection preface has been sent. |
|
504 // Therefore we're going to wait if needed before reading |
|
505 // (and thus replying) to anything. |
|
506 // Starting to reply to something (e.g send an ACK to a |
|
507 // SettingsFrame sent by the server) before the connection |
|
508 // preface is fully sent might result in the server |
|
509 // sending a GOAWAY frame with 'invalid_preface'. |
|
510 // |
|
511 // Note: asyncReceive is only called from the Http2TubeSubscriber |
|
512 // sequential scheduler. |
|
513 try { |
|
514 Supplier<ByteBuffer> bs = initial; |
|
515 // ensure that we always handle the initial buffer first, |
|
516 // if any. |
|
517 if (bs != null) { |
|
518 initial = null; |
|
519 ByteBuffer b = bs.get(); |
|
520 if (b.hasRemaining()) { |
|
521 long c = ++count; |
|
522 debug.log(Level.DEBUG, () -> "H2 Receiving Initial(" |
|
523 + c +"): " + b.remaining()); |
|
524 framesController.processReceivedData(framesDecoder, b); |
|
525 } |
|
526 } |
|
527 ByteBuffer b = buffer; |
|
528 // the Http2TubeSubscriber scheduler ensures that the order of incoming |
|
529 // buffers is preserved. |
|
530 if (b == EMPTY_TRIGGER) { |
|
531 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER"); |
|
532 boolean prefaceSent = framesController.prefaceSent; |
|
533 assert prefaceSent; |
|
534 // call framesController.processReceivedData to potentially |
|
535 // trigger the processing of all the data buffered there. |
|
536 framesController.processReceivedData(framesDecoder, buffer); |
|
537 debug.log(Level.DEBUG, "H2 processed buffered data"); |
|
538 } else { |
|
539 long c = ++count; |
|
540 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining()); |
|
541 framesController.processReceivedData(framesDecoder, buffer); |
|
542 debug.log(Level.DEBUG, "H2 processed(%d)", c); |
|
543 } |
|
544 } catch (Throwable e) { |
|
545 String msg = Utils.stackTrace(e); |
|
546 Log.logTrace(msg); |
|
547 shutdown(e); |
|
548 } |
|
549 } |
|
550 |
|
551 Throwable getRecordedCause() { |
|
552 return cause; |
|
553 } |
|
554 |
|
555 void shutdown(Throwable t) { |
|
556 debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t); |
|
557 if (closed == true) return; |
|
558 synchronized (this) { |
|
559 if (closed == true) return; |
|
560 closed = true; |
|
561 } |
|
562 Log.logError(t); |
|
563 Throwable initialCause = this.cause; |
|
564 if (initialCause == null) this.cause = t; |
|
565 client2.deleteConnection(this); |
|
566 List<Stream<?>> c = new LinkedList<>(streams.values()); |
|
567 for (Stream<?> s : c) { |
|
568 s.cancelImpl(t); |
|
569 } |
|
570 connection.close(); |
|
571 } |
|
572 |
|
573 /** |
|
574 * Streams initiated by a client MUST use odd-numbered stream |
|
575 * identifiers; those initiated by the server MUST use even-numbered |
|
576 * stream identifiers. |
|
577 */ |
|
578 private static final boolean isSeverInitiatedStream(int streamid) { |
|
579 return (streamid & 0x1) == 0; |
|
580 } |
|
581 |
|
582 /** |
|
583 * Handles stream 0 (common) frames that apply to whole connection and passes |
|
584 * other stream specific frames to that Stream object. |
|
585 * |
|
586 * Invokes Stream.incoming() which is expected to process frame without |
|
587 * blocking. |
|
588 */ |
|
589 void processFrame(Http2Frame frame) throws IOException { |
|
590 Log.logFrames(frame, "IN"); |
|
591 int streamid = frame.streamid(); |
|
592 if (frame instanceof MalformedFrame) { |
|
593 Log.logError(((MalformedFrame) frame).getMessage()); |
|
594 if (streamid == 0) { |
|
595 framesDecoder.close("Malformed frame on stream 0"); |
|
596 protocolError(((MalformedFrame) frame).getErrorCode(), |
|
597 ((MalformedFrame) frame).getMessage()); |
|
598 } else { |
|
599 debug.log(Level.DEBUG, () -> "Reset stream: " |
|
600 + ((MalformedFrame) frame).getMessage()); |
|
601 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); |
|
602 } |
|
603 return; |
|
604 } |
|
605 if (streamid == 0) { |
|
606 handleConnectionFrame(frame); |
|
607 } else { |
|
608 if (frame instanceof SettingsFrame) { |
|
609 // The stream identifier for a SETTINGS frame MUST be zero |
|
610 framesDecoder.close( |
|
611 "The stream identifier for a SETTINGS frame MUST be zero"); |
|
612 protocolError(GoAwayFrame.PROTOCOL_ERROR); |
|
613 return; |
|
614 } |
|
615 |
|
616 Stream<?> stream = getStream(streamid); |
|
617 if (stream == null) { |
|
618 // Should never receive a frame with unknown stream id |
|
619 |
|
620 if (frame instanceof HeaderFrame) { |
|
621 // always decode the headers as they may affect |
|
622 // connection-level HPACK decoding state |
|
623 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); |
|
624 decodeHeaders((HeaderFrame) frame, decoder); |
|
625 } |
|
626 |
|
627 if (!(frame instanceof ResetFrame)) { |
|
628 if (isSeverInitiatedStream(streamid)) { |
|
629 if (streamid < nextPushStream) { |
|
630 // trailing data on a cancelled push promise stream, |
|
631 // reset will already have been sent, ignore |
|
632 Log.logTrace("Ignoring cancelled push promise frame " + frame); |
|
633 } else { |
|
634 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
635 } |
|
636 } else if (streamid >= nextstreamid) { |
|
637 // otherwise the stream has already been reset/closed |
|
638 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
639 } |
|
640 } |
|
641 return; |
|
642 } |
|
643 if (frame instanceof PushPromiseFrame) { |
|
644 PushPromiseFrame pp = (PushPromiseFrame)frame; |
|
645 handlePushPromise(stream, pp); |
|
646 } else if (frame instanceof HeaderFrame) { |
|
647 // decode headers (or continuation) |
|
648 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); |
|
649 stream.incoming(frame); |
|
650 } else { |
|
651 stream.incoming(frame); |
|
652 } |
|
653 } |
|
654 } |
|
655 |
|
656 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) |
|
657 throws IOException |
|
658 { |
|
659 // always decode the headers as they may affect connection-level HPACK |
|
660 // decoding state |
|
661 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); |
|
662 decodeHeaders(pp, decoder); |
|
663 |
|
664 HttpRequestImpl parentReq = parent.request; |
|
665 int promisedStreamid = pp.getPromisedStream(); |
|
666 if (promisedStreamid != nextPushStream) { |
|
667 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); |
|
668 return; |
|
669 } else { |
|
670 nextPushStream += 2; |
|
671 } |
|
672 |
|
673 HttpHeadersImpl headers = decoder.headers(); |
|
674 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); |
|
675 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); |
|
676 Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch); |
|
677 pushExch.exchImpl = pushStream; |
|
678 pushStream.registerStream(promisedStreamid); |
|
679 parent.incoming_pushPromise(pushReq, pushStream); |
|
680 } |
|
681 |
|
682 private void handleConnectionFrame(Http2Frame frame) |
|
683 throws IOException |
|
684 { |
|
685 switch (frame.type()) { |
|
686 case SettingsFrame.TYPE: |
|
687 handleSettings((SettingsFrame)frame); |
|
688 break; |
|
689 case PingFrame.TYPE: |
|
690 handlePing((PingFrame)frame); |
|
691 break; |
|
692 case GoAwayFrame.TYPE: |
|
693 handleGoAway((GoAwayFrame)frame); |
|
694 break; |
|
695 case WindowUpdateFrame.TYPE: |
|
696 handleWindowUpdate((WindowUpdateFrame)frame); |
|
697 break; |
|
698 default: |
|
699 protocolError(ErrorFrame.PROTOCOL_ERROR); |
|
700 } |
|
701 } |
|
702 |
|
703 void resetStream(int streamid, int code) throws IOException { |
|
704 Log.logError( |
|
705 "Resetting stream {0,number,integer} with error code {1,number,integer}", |
|
706 streamid, code); |
|
707 ResetFrame frame = new ResetFrame(streamid, code); |
|
708 sendFrame(frame); |
|
709 closeStream(streamid); |
|
710 } |
|
711 |
|
712 void closeStream(int streamid) { |
|
713 debug.log(Level.DEBUG, "Closed stream %d", streamid); |
|
714 Stream<?> s = streams.remove(streamid); |
|
715 if (s != null) { |
|
716 // decrement the reference count on the HttpClientImpl |
|
717 // to allow the SelectorManager thread to exit if no |
|
718 // other operation is pending and the facade is no |
|
719 // longer referenced. |
|
720 client().unreference(); |
|
721 } |
|
722 // ## Remove s != null. It is a hack for delayed cancellation,reset |
|
723 if (s != null && !(s instanceof Stream.PushedStream)) { |
|
724 // Since PushStreams have no request body, then they have no |
|
725 // corresponding entry in the window controller. |
|
726 windowController.removeStream(streamid); |
|
727 } |
|
728 if (singleStream() && streams.isEmpty()) { |
|
729 // should be only 1 stream, but there might be more if server push |
|
730 close(); |
|
731 } |
|
732 } |
|
733 |
|
734 /** |
|
735 * Increments this connection's send Window by the amount in the given frame. |
|
736 */ |
|
737 private void handleWindowUpdate(WindowUpdateFrame f) |
|
738 throws IOException |
|
739 { |
|
740 int amount = f.getUpdate(); |
|
741 if (amount <= 0) { |
|
742 // ## temporarily disable to workaround a bug in Jetty where it |
|
743 // ## sends Window updates with a 0 update value. |
|
744 //protocolError(ErrorFrame.PROTOCOL_ERROR); |
|
745 } else { |
|
746 boolean success = windowController.increaseConnectionWindow(amount); |
|
747 if (!success) { |
|
748 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow |
|
749 } |
|
750 } |
|
751 } |
|
752 |
|
753 private void protocolError(int errorCode) |
|
754 throws IOException |
|
755 { |
|
756 protocolError(errorCode, null); |
|
757 } |
|
758 |
|
759 private void protocolError(int errorCode, String msg) |
|
760 throws IOException |
|
761 { |
|
762 GoAwayFrame frame = new GoAwayFrame(0, errorCode); |
|
763 sendFrame(frame); |
|
764 shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); |
|
765 } |
|
766 |
|
767 private void handleSettings(SettingsFrame frame) |
|
768 throws IOException |
|
769 { |
|
770 assert frame.streamid() == 0; |
|
771 if (!frame.getFlag(SettingsFrame.ACK)) { |
|
772 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); |
|
773 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); |
|
774 int diff = newWindowSize - oldWindowSize; |
|
775 if (diff != 0) { |
|
776 windowController.adjustActiveStreams(diff); |
|
777 } |
|
778 serverSettings = frame; |
|
779 sendFrame(new SettingsFrame(SettingsFrame.ACK)); |
|
780 } |
|
781 } |
|
782 |
|
783 private void handlePing(PingFrame frame) |
|
784 throws IOException |
|
785 { |
|
786 frame.setFlag(PingFrame.ACK); |
|
787 sendUnorderedFrame(frame); |
|
788 } |
|
789 |
|
790 private void handleGoAway(GoAwayFrame frame) |
|
791 throws IOException |
|
792 { |
|
793 shutdown(new IOException( |
|
794 String.valueOf(connection.channel().getLocalAddress()) |
|
795 +": GOAWAY received")); |
|
796 } |
|
797 |
|
798 /** |
|
799 * Max frame size we are allowed to send |
|
800 */ |
|
801 public int getMaxSendFrameSize() { |
|
802 int param = serverSettings.getParameter(MAX_FRAME_SIZE); |
|
803 if (param == -1) { |
|
804 param = DEFAULT_FRAME_SIZE; |
|
805 } |
|
806 return param; |
|
807 } |
|
808 |
|
809 /** |
|
810 * Max frame size we will receive |
|
811 */ |
|
812 public int getMaxReceiveFrameSize() { |
|
813 return clientSettings.getParameter(MAX_FRAME_SIZE); |
|
814 } |
|
815 |
|
816 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; |
|
817 |
|
818 private static final byte[] PREFACE_BYTES = |
|
819 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); |
|
820 |
|
821 /** |
|
822 * Sends Connection preface and Settings frame with current preferred |
|
823 * values |
|
824 */ |
|
825 private void sendConnectionPreface() throws IOException { |
|
826 Log.logTrace("{0}: start sending connection preface to {1}", |
|
827 connection.channel().getLocalAddress(), |
|
828 connection.address()); |
|
829 SettingsFrame sf = new SettingsFrame(clientSettings); |
|
830 int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE); |
|
831 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); |
|
832 Log.logFrames(sf, "OUT"); |
|
833 // send preface bytes and SettingsFrame together |
|
834 HttpPublisher publisher = publisher(); |
|
835 publisher.enqueue(List.of(buf)); |
|
836 publisher.signalEnqueued(); |
|
837 // mark preface sent. |
|
838 framesController.markPrefaceSent(); |
|
839 Log.logTrace("PREFACE_BYTES sent"); |
|
840 Log.logTrace("Settings Frame sent"); |
|
841 |
|
842 // send a Window update for the receive buffer we are using |
|
843 // minus the initial 64 K specified in protocol |
|
844 final int len = windowUpdater.initialWindowSize - initialWindowSize; |
|
845 if (len > 0) { |
|
846 windowUpdater.sendWindowUpdate(len); |
|
847 } |
|
848 // there will be an ACK to the windows update - which should |
|
849 // cause any pending data stored before the preface was sent to be |
|
850 // flushed (see PrefaceController). |
|
851 Log.logTrace("finished sending connection preface"); |
|
852 debug.log(Level.DEBUG, "Triggering processing of buffered data" |
|
853 + " after sending connection preface"); |
|
854 subscriber.onNext(List.of(EMPTY_TRIGGER)); |
|
855 } |
|
856 |
|
857 /** |
|
858 * Returns an existing Stream with given id, or null if doesn't exist |
|
859 */ |
|
860 @SuppressWarnings("unchecked") |
|
861 <T> Stream<T> getStream(int streamid) { |
|
862 return (Stream<T>)streams.get(streamid); |
|
863 } |
|
864 |
|
865 /** |
|
866 * Creates Stream with given id. |
|
867 */ |
|
868 final <T> Stream<T> createStream(Exchange<T> exchange) { |
|
869 Stream<T> stream = new Stream<>(this, exchange, windowController); |
|
870 return stream; |
|
871 } |
|
872 |
|
873 <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { |
|
874 PushGroup<T> pg = parent.exchange.getPushGroup(); |
|
875 return new Stream.PushedStream<>(pg, this, pushEx); |
|
876 } |
|
877 |
|
878 <T> void putStream(Stream<T> stream, int streamid) { |
|
879 // increment the reference count on the HttpClientImpl |
|
880 // to prevent the SelectorManager thread from exiting until |
|
881 // the stream is closed. |
|
882 client().reference(); |
|
883 streams.put(streamid, stream); |
|
884 } |
|
885 |
|
886 /** |
|
887 * Encode the headers into a List<ByteBuffer> and then create HEADERS |
|
888 * and CONTINUATION frames from the list and return the List<Http2Frame>. |
|
889 */ |
|
890 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { |
|
891 List<ByteBuffer> buffers = encodeHeadersImpl( |
|
892 getMaxSendFrameSize(), |
|
893 frame.getAttachment().getRequestPseudoHeaders(), |
|
894 frame.getUserHeaders(), |
|
895 frame.getSystemHeaders()); |
|
896 |
|
897 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); |
|
898 Iterator<ByteBuffer> bufIterator = buffers.iterator(); |
|
899 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); |
|
900 frames.add(oframe); |
|
901 while(bufIterator.hasNext()) { |
|
902 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); |
|
903 frames.add(oframe); |
|
904 } |
|
905 oframe.setFlag(HeaderFrame.END_HEADERS); |
|
906 return frames; |
|
907 } |
|
908 |
|
909 // Dedicated cache for headers encoding ByteBuffer. |
|
910 // There can be no concurrent access to this buffer as all access to this buffer |
|
911 // and its content happen within a single critical code block section protected |
|
912 // by the sendLock. / (see sendFrame()) |
|
913 // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); |
|
914 |
|
915 private ByteBuffer getHeaderBuffer(int maxFrameSize) { |
|
916 ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); |
|
917 buf.limit(maxFrameSize); |
|
918 return buf; |
|
919 } |
|
920 |
|
921 /* |
|
922 * Encodes all the headers from the given HttpHeaders into the given List |
|
923 * of buffers. |
|
924 * |
|
925 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : |
|
926 * |
|
927 * ...Just as in HTTP/1.x, header field names are strings of ASCII |
|
928 * characters that are compared in a case-insensitive fashion. However, |
|
929 * header field names MUST be converted to lowercase prior to their |
|
930 * encoding in HTTP/2... |
|
931 */ |
|
932 private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { |
|
933 ByteBuffer buffer = getHeaderBuffer(maxFrameSize); |
|
934 List<ByteBuffer> buffers = new ArrayList<>(); |
|
935 for(HttpHeaders header : headers) { |
|
936 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { |
|
937 String lKey = e.getKey().toLowerCase(); |
|
938 List<String> values = e.getValue(); |
|
939 for (String value : values) { |
|
940 hpackOut.header(lKey, value); |
|
941 while (!hpackOut.encode(buffer)) { |
|
942 buffer.flip(); |
|
943 buffers.add(buffer); |
|
944 buffer = getHeaderBuffer(maxFrameSize); |
|
945 } |
|
946 } |
|
947 } |
|
948 } |
|
949 buffer.flip(); |
|
950 buffers.add(buffer); |
|
951 return buffers; |
|
952 } |
|
953 |
|
954 private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { |
|
955 oh.streamid(stream.streamid); |
|
956 if (Log.headers()) { |
|
957 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); |
|
958 sb.append(stream.streamid).append(")\n"); |
|
959 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); |
|
960 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); |
|
961 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); |
|
962 Log.logHeaders(sb.toString()); |
|
963 } |
|
964 List<HeaderFrame> frames = encodeHeaders(oh); |
|
965 return encodeFrames(frames); |
|
966 } |
|
967 |
|
968 private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) { |
|
969 if (Log.frames()) { |
|
970 frames.forEach(f -> Log.logFrames(f, "OUT")); |
|
971 } |
|
972 return framesEncoder.encodeFrames(frames); |
|
973 } |
|
974 |
|
975 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { |
|
976 Stream<?> stream = oh.getAttachment(); |
|
977 int streamid = nextstreamid; |
|
978 nextstreamid += 2; |
|
979 stream.registerStream(streamid); |
|
980 // set outgoing window here. This allows thread sending |
|
981 // body to proceed. |
|
982 windowController.registerStream(streamid, getInitialSendWindowSize()); |
|
983 return stream; |
|
984 } |
|
985 |
|
986 private final Object sendlock = new Object(); |
|
987 |
|
988 void sendFrame(Http2Frame frame) { |
|
989 try { |
|
990 HttpPublisher publisher = publisher(); |
|
991 synchronized (sendlock) { |
|
992 if (frame instanceof OutgoingHeaders) { |
|
993 @SuppressWarnings("unchecked") |
|
994 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; |
|
995 Stream<?> stream = registerNewStream(oh); |
|
996 // provide protection from inserting unordered frames between Headers and Continuation |
|
997 publisher.enqueue(encodeHeaders(oh, stream)); |
|
998 } else { |
|
999 publisher.enqueue(encodeFrame(frame)); |
|
1000 } |
|
1001 } |
|
1002 publisher.signalEnqueued(); |
|
1003 } catch (IOException e) { |
|
1004 if (!closed) { |
|
1005 Log.logError(e); |
|
1006 shutdown(e); |
|
1007 } |
|
1008 } |
|
1009 } |
|
1010 |
|
1011 private List<ByteBuffer> encodeFrame(Http2Frame frame) { |
|
1012 Log.logFrames(frame, "OUT"); |
|
1013 return framesEncoder.encodeFrame(frame); |
|
1014 } |
|
1015 |
|
1016 void sendDataFrame(DataFrame frame) { |
|
1017 try { |
|
1018 HttpPublisher publisher = publisher(); |
|
1019 publisher.enqueue(encodeFrame(frame)); |
|
1020 publisher.signalEnqueued(); |
|
1021 } catch (IOException e) { |
|
1022 if (!closed) { |
|
1023 Log.logError(e); |
|
1024 shutdown(e); |
|
1025 } |
|
1026 } |
|
1027 } |
|
1028 |
|
1029 /* |
|
1030 * Direct call of the method bypasses synchronization on "sendlock" and |
|
1031 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. |
|
1032 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. |
|
1033 */ |
|
1034 void sendUnorderedFrame(Http2Frame frame) { |
|
1035 try { |
|
1036 HttpPublisher publisher = publisher(); |
|
1037 publisher.enqueueUnordered(encodeFrame(frame)); |
|
1038 publisher.signalEnqueued(); |
|
1039 } catch (IOException e) { |
|
1040 if (!closed) { |
|
1041 Log.logError(e); |
|
1042 shutdown(e); |
|
1043 } |
|
1044 } |
|
1045 } |
|
1046 |
|
1047 /** |
|
1048 * A simple tube subscriber for reading from the connection flow. |
|
1049 */ |
|
1050 final class Http2TubeSubscriber implements TubeSubscriber { |
|
1051 volatile Flow.Subscription subscription; |
|
1052 volatile boolean completed; |
|
1053 volatile boolean dropped; |
|
1054 volatile Throwable error; |
|
1055 final ConcurrentLinkedQueue<ByteBuffer> queue |
|
1056 = new ConcurrentLinkedQueue<>(); |
|
1057 final SequentialScheduler scheduler = |
|
1058 SequentialScheduler.synchronizedScheduler(this::processQueue); |
|
1059 |
|
1060 final void processQueue() { |
|
1061 try { |
|
1062 while (!queue.isEmpty() && !scheduler.isStopped()) { |
|
1063 ByteBuffer buffer = queue.poll(); |
|
1064 debug.log(Level.DEBUG, |
|
1065 "sending %d to Http2Connection.asyncReceive", |
|
1066 buffer.remaining()); |
|
1067 asyncReceive(buffer); |
|
1068 } |
|
1069 } catch (Throwable t) { |
|
1070 Throwable x = error; |
|
1071 if (x == null) error = t; |
|
1072 } finally { |
|
1073 Throwable x = error; |
|
1074 if (x != null) { |
|
1075 debug.log(Level.DEBUG, "Stopping scheduler", x); |
|
1076 scheduler.stop(); |
|
1077 Http2Connection.this.shutdown(x); |
|
1078 } |
|
1079 } |
|
1080 } |
|
1081 |
|
1082 @Override |
|
1083 public void onSubscribe(Flow.Subscription subscription) { |
|
1084 // supports being called multiple time. |
|
1085 // doesn't cancel the previous subscription, since that is |
|
1086 // most probably the same as the new subscription. |
|
1087 assert this.subscription == null || dropped == false; |
|
1088 this.subscription = subscription; |
|
1089 dropped = false; |
|
1090 // TODO FIXME: request(1) should be done by the delegate. |
|
1091 if (!completed) { |
|
1092 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading"); |
|
1093 subscription.request(Long.MAX_VALUE); |
|
1094 } else { |
|
1095 debug.log(Level.DEBUG, "onSubscribe: already completed"); |
|
1096 } |
|
1097 } |
|
1098 |
|
1099 @Override |
|
1100 public void onNext(List<ByteBuffer> item) { |
|
1101 debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) |
|
1102 + " bytes in " + item.size() + " buffers"); |
|
1103 queue.addAll(item); |
|
1104 scheduler.runOrSchedule(client().theExecutor()); |
|
1105 } |
|
1106 |
|
1107 @Override |
|
1108 public void onError(Throwable throwable) { |
|
1109 debug.log(Level.DEBUG, () -> "onError: " + throwable); |
|
1110 error = throwable; |
|
1111 completed = true; |
|
1112 scheduler.runOrSchedule(client().theExecutor()); |
|
1113 } |
|
1114 |
|
1115 @Override |
|
1116 public void onComplete() { |
|
1117 debug.log(Level.DEBUG, "EOF"); |
|
1118 error = new EOFException("EOF reached while reading"); |
|
1119 completed = true; |
|
1120 scheduler.runOrSchedule(client().theExecutor()); |
|
1121 } |
|
1122 |
|
1123 @Override |
|
1124 public void dropSubscription() { |
|
1125 debug.log(Level.DEBUG, "dropSubscription"); |
|
1126 // we could probably set subscription to null here... |
|
1127 // then we might not need the 'dropped' boolean? |
|
1128 dropped = true; |
|
1129 } |
|
1130 } |
|
1131 |
|
1132 @Override |
|
1133 public final String toString() { |
|
1134 return dbgString(); |
|
1135 } |
|
1136 |
|
1137 final String dbgString() { |
|
1138 return "Http2Connection(" |
|
1139 + connection.getConnectionFlow() + ")"; |
|
1140 } |
|
1141 |
|
1142 final class LoggingHeaderDecoder extends HeaderDecoder { |
|
1143 |
|
1144 private final HeaderDecoder delegate; |
|
1145 private final System.Logger debugHpack = |
|
1146 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); |
|
1147 |
|
1148 LoggingHeaderDecoder(HeaderDecoder delegate) { |
|
1149 this.delegate = delegate; |
|
1150 } |
|
1151 |
|
1152 String dbgString() { |
|
1153 return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder"; |
|
1154 } |
|
1155 |
|
1156 @Override |
|
1157 public void onDecoded(CharSequence name, CharSequence value) { |
|
1158 delegate.onDecoded(name, value); |
|
1159 } |
|
1160 |
|
1161 @Override |
|
1162 public void onIndexed(int index, |
|
1163 CharSequence name, |
|
1164 CharSequence value) { |
|
1165 debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n", |
|
1166 index, name, value); |
|
1167 delegate.onIndexed(index, name, value); |
|
1168 } |
|
1169 |
|
1170 @Override |
|
1171 public void onLiteral(int index, |
|
1172 CharSequence name, |
|
1173 CharSequence value, |
|
1174 boolean valueHuffman) { |
|
1175 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", |
|
1176 index, name, value, valueHuffman); |
|
1177 delegate.onLiteral(index, name, value, valueHuffman); |
|
1178 } |
|
1179 |
|
1180 @Override |
|
1181 public void onLiteral(CharSequence name, |
|
1182 boolean nameHuffman, |
|
1183 CharSequence value, |
|
1184 boolean valueHuffman) { |
|
1185 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", |
|
1186 name, nameHuffman, value, valueHuffman); |
|
1187 delegate.onLiteral(name, nameHuffman, value, valueHuffman); |
|
1188 } |
|
1189 |
|
1190 @Override |
|
1191 public void onLiteralNeverIndexed(int index, |
|
1192 CharSequence name, |
|
1193 CharSequence value, |
|
1194 boolean valueHuffman) { |
|
1195 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", |
|
1196 index, name, value, valueHuffman); |
|
1197 delegate.onLiteralNeverIndexed(index, name, value, valueHuffman); |
|
1198 } |
|
1199 |
|
1200 @Override |
|
1201 public void onLiteralNeverIndexed(CharSequence name, |
|
1202 boolean nameHuffman, |
|
1203 CharSequence value, |
|
1204 boolean valueHuffman) { |
|
1205 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", |
|
1206 name, nameHuffman, value, valueHuffman); |
|
1207 delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman); |
|
1208 } |
|
1209 |
|
1210 @Override |
|
1211 public void onLiteralWithIndexing(int index, |
|
1212 CharSequence name, |
|
1213 CharSequence value, |
|
1214 boolean valueHuffman) { |
|
1215 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", |
|
1216 index, name, value, valueHuffman); |
|
1217 delegate.onLiteralWithIndexing(index, name, value, valueHuffman); |
|
1218 } |
|
1219 |
|
1220 @Override |
|
1221 public void onLiteralWithIndexing(CharSequence name, |
|
1222 boolean nameHuffman, |
|
1223 CharSequence value, |
|
1224 boolean valueHuffman) { |
|
1225 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", |
|
1226 name, nameHuffman, value, valueHuffman); |
|
1227 delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman); |
|
1228 } |
|
1229 |
|
1230 @Override |
|
1231 public void onSizeUpdate(int capacity) { |
|
1232 debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity); |
|
1233 delegate.onSizeUpdate(capacity); |
|
1234 } |
|
1235 |
|
1236 @Override |
|
1237 HttpHeadersImpl headers() { |
|
1238 return delegate.headers(); |
|
1239 } |
|
1240 } |
|
1241 |
|
1242 static class HeaderDecoder implements DecodingCallback { |
|
1243 HttpHeadersImpl headers; |
|
1244 |
|
1245 HeaderDecoder() { |
|
1246 this.headers = new HttpHeadersImpl(); |
|
1247 } |
|
1248 |
|
1249 @Override |
|
1250 public void onDecoded(CharSequence name, CharSequence value) { |
|
1251 headers.addHeader(name.toString(), value.toString()); |
|
1252 } |
|
1253 |
|
1254 HttpHeadersImpl headers() { |
|
1255 return headers; |
|
1256 } |
|
1257 } |
|
1258 |
|
1259 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { |
|
1260 |
|
1261 final int initialWindowSize; |
|
1262 public ConnectionWindowUpdateSender(Http2Connection connection, |
|
1263 int initialWindowSize) { |
|
1264 super(connection, initialWindowSize); |
|
1265 this.initialWindowSize = initialWindowSize; |
|
1266 } |
|
1267 |
|
1268 @Override |
|
1269 int getStreamId() { |
|
1270 return 0; |
|
1271 } |
|
1272 } |
|
1273 |
|
1274 /** |
|
1275 * Thrown when https handshake negotiates http/1.1 alpn instead of h2 |
|
1276 */ |
|
1277 static final class ALPNException extends IOException { |
|
1278 private static final long serialVersionUID = 0L; |
|
1279 final transient AbstractAsyncSSLConnection connection; |
|
1280 |
|
1281 ALPNException(String msg, AbstractAsyncSSLConnection connection) { |
|
1282 super(msg); |
|
1283 this.connection = connection; |
|
1284 } |
|
1285 |
|
1286 AbstractAsyncSSLConnection getConnection() { |
|
1287 return connection; |
|
1288 } |
|
1289 } |
|
1290 } |