equal
deleted
inserted
replaced
257 // to be sent. The following two fields are updated as soon as a stream |
257 // to be sent. The following two fields are updated as soon as a stream |
258 // is created and assigned to a connection. They are checked before |
258 // is created and assigned to a connection. They are checked before |
259 // assigning a stream to a connection. |
259 // assigning a stream to a connection. |
260 private int lastReservedClientStreamid = 1; |
260 private int lastReservedClientStreamid = 1; |
261 private int lastReservedServerStreamid = 0; |
261 private int lastReservedServerStreamid = 0; |
|
262 private int numReservedClientStreams = 0; // count of current streams |
|
263 private int numReservedServerStreams = 0; // count of current streams |
262 private final Encoder hpackOut; |
264 private final Encoder hpackOut; |
263 private final Decoder hpackIn; |
265 private final Decoder hpackIn; |
264 final SettingsFrame clientSettings; |
266 final SettingsFrame clientSettings; |
265 private volatile SettingsFrame serverSettings; |
267 private volatile SettingsFrame serverSettings; |
266 private final String key; // for HttpClientImpl.connections map |
268 private final String key; // for HttpClientImpl.connections map |
309 client2.getConnectionWindowSize(clientSettings)); |
311 client2.getConnectionWindowSize(clientSettings)); |
310 } |
312 } |
311 |
313 |
312 /** |
314 /** |
313 * Case 1) Create from upgraded HTTP/1.1 connection. |
315 * Case 1) Create from upgraded HTTP/1.1 connection. |
314 * Is ready to use. Can be SSL. exchange is the Exchange |
316 * Is ready to use. Can't be SSL. exchange is the Exchange |
315 * that initiated the connection, whose response will be delivered |
317 * that initiated the connection, whose response will be delivered |
316 * on a Stream. |
318 * on a Stream. |
317 */ |
319 */ |
318 private Http2Connection(HttpConnection connection, |
320 private Http2Connection(HttpConnection connection, |
319 Http2ClientImpl client2, |
321 Http2ClientImpl client2, |
323 { |
325 { |
324 this(connection, |
326 this(connection, |
325 client2, |
327 client2, |
326 3, // stream 1 is registered during the upgrade |
328 3, // stream 1 is registered during the upgrade |
327 keyFor(connection)); |
329 keyFor(connection)); |
|
330 reserveStream(true); |
328 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); |
331 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); |
329 |
332 |
330 Stream<?> initialStream = createStream(exchange); |
333 Stream<?> initialStream = createStream(exchange); |
331 initialStream.registerStream(1); |
334 initialStream.registerStream(1); |
332 windowController.registerStream(1, getInitialSendWindowSize()); |
335 windowController.registerStream(1, getInitialSendWindowSize()); |
406 } |
409 } |
407 |
410 |
408 // call these before assigning a request/stream to a connection |
411 // call these before assigning a request/stream to a connection |
409 // if false returned then a new Http2Connection is required |
412 // if false returned then a new Http2Connection is required |
410 // if true, the the stream may be assigned to this connection |
413 // if true, the the stream may be assigned to this connection |
411 synchronized boolean reserveStream(boolean clientInitiated) { |
414 // for server push, if false returned, then the stream should be cancelled |
|
415 synchronized boolean reserveStream(boolean clientInitiated) throws IOException { |
412 if (finalStream) { |
416 if (finalStream) { |
413 return false; |
417 return false; |
414 } |
418 } |
415 if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) { |
419 if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) { |
416 setFinalStream(); |
420 setFinalStream(); |
423 } |
427 } |
424 if (clientInitiated) |
428 if (clientInitiated) |
425 lastReservedClientStreamid+=2; |
429 lastReservedClientStreamid+=2; |
426 else |
430 else |
427 lastReservedServerStreamid+=2; |
431 lastReservedServerStreamid+=2; |
|
432 |
|
433 assert numReservedClientStreams >= 0; |
|
434 assert numReservedServerStreams >= 0; |
|
435 if (clientInitiated && numReservedClientStreams >= getMaxConcurrentClientStreams()) { |
|
436 throw new IOException("too many concurrent streams"); |
|
437 } else if (clientInitiated) { |
|
438 numReservedClientStreams++; |
|
439 } |
|
440 if (!clientInitiated && numReservedServerStreams >= getMaxConcurrentServerStreams()) { |
|
441 return false; |
|
442 } else if (!clientInitiated) { |
|
443 numReservedServerStreams++; |
|
444 } |
428 return true; |
445 return true; |
429 } |
446 } |
430 |
447 |
431 /** |
448 /** |
432 * Throws an IOException if h2 was not negotiated |
449 * Throws an IOException if h2 was not negotiated |
561 } |
578 } |
562 } |
579 } |
563 |
580 |
564 final int getInitialSendWindowSize() { |
581 final int getInitialSendWindowSize() { |
565 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); |
582 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); |
|
583 } |
|
584 |
|
585 final int getMaxConcurrentClientStreams() { |
|
586 return serverSettings.getParameter(MAX_CONCURRENT_STREAMS); |
|
587 } |
|
588 |
|
589 final int getMaxConcurrentServerStreams() { |
|
590 return clientSettings.getParameter(MAX_CONCURRENT_STREAMS); |
566 } |
591 } |
567 |
592 |
568 void close() { |
593 void close() { |
569 Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); |
594 Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); |
570 GoAwayFrame f = new GoAwayFrame(0, |
595 GoAwayFrame f = new GoAwayFrame(0, |
816 } |
841 } |
817 } |
842 } |
818 |
843 |
819 void closeStream(int streamid) { |
844 void closeStream(int streamid) { |
820 if (debug.on()) debug.log("Closed stream %d", streamid); |
845 if (debug.on()) debug.log("Closed stream %d", streamid); |
|
846 boolean isClient = (streamid % 2) == 1; |
821 Stream<?> s = streams.remove(streamid); |
847 Stream<?> s = streams.remove(streamid); |
822 if (s != null) { |
848 if (s != null) { |
|
849 synchronized (this) { |
|
850 if (isClient) |
|
851 numReservedClientStreams--; |
|
852 else |
|
853 numReservedServerStreams--; |
|
854 } |
|
855 assert numReservedClientStreams >= 0; |
|
856 assert numReservedServerStreams >= 0; |
823 // decrement the reference count on the HttpClientImpl |
857 // decrement the reference count on the HttpClientImpl |
824 // to allow the SelectorManager thread to exit if no |
858 // to allow the SelectorManager thread to exit if no |
825 // other operation is pending and the facade is no |
859 // other operation is pending and the facade is no |
826 // longer referenced. |
860 // longer referenced. |
827 client().streamUnreference(); |
861 client().streamUnreference(); |
1166 private final ConcurrentLinkedQueue<ByteBuffer> queue |
1200 private final ConcurrentLinkedQueue<ByteBuffer> queue |
1167 = new ConcurrentLinkedQueue<>(); |
1201 = new ConcurrentLinkedQueue<>(); |
1168 private final SequentialScheduler scheduler = |
1202 private final SequentialScheduler scheduler = |
1169 SequentialScheduler.synchronizedScheduler(this::processQueue); |
1203 SequentialScheduler.synchronizedScheduler(this::processQueue); |
1170 private final HttpClientImpl client; |
1204 private final HttpClientImpl client; |
1171 |
1205 |
1172 Http2TubeSubscriber(HttpClientImpl client) { |
1206 Http2TubeSubscriber(HttpClientImpl client) { |
1173 this.client = Objects.requireNonNull(client); |
1207 this.client = Objects.requireNonNull(client); |
1174 } |
1208 } |
1175 |
1209 |
1176 final void processQueue() { |
1210 final void processQueue() { |