226 * Each of this connection's Streams MUST use this controller. |
226 * Each of this connection's Streams MUST use this controller. |
227 */ |
227 */ |
228 private final WindowController windowController = new WindowController(); |
228 private final WindowController windowController = new WindowController(); |
229 private final FramesController framesController = new FramesController(); |
229 private final FramesController framesController = new FramesController(); |
230 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); |
230 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); |
231 final WindowUpdateSender windowUpdater; |
231 final ConnectionWindowUpdateSender windowUpdater; |
232 private volatile Throwable cause; |
232 private volatile Throwable cause; |
233 private volatile Supplier<ByteBuffer> initial; |
233 private volatile Supplier<ByteBuffer> initial; |
234 |
234 |
235 static final int DEFAULT_FRAME_SIZE = 16 * 1024; |
235 static final int DEFAULT_FRAME_SIZE = 16 * 1024; |
236 |
236 |
245 this.connection = connection; |
245 this.connection = connection; |
246 this.client2 = client2; |
246 this.client2 = client2; |
247 this.nextstreamid = nextstreamid; |
247 this.nextstreamid = nextstreamid; |
248 this.key = key; |
248 this.key = key; |
249 this.clientSettings = this.client2.getClientSettings(); |
249 this.clientSettings = this.client2.getClientSettings(); |
250 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); |
250 this.framesDecoder = new FramesDecoder(this::processFrame, |
|
251 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); |
251 // serverSettings will be updated by server |
252 // serverSettings will be updated by server |
252 this.serverSettings = SettingsFrame.getDefaultSettings(); |
253 this.serverSettings = SettingsFrame.getDefaultSettings(); |
253 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); |
254 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); |
254 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); |
255 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); |
255 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); |
256 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); |
256 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); |
257 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); |
257 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); |
258 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); |
258 this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize()); |
259 this.windowUpdater = new ConnectionWindowUpdateSender(this, |
|
260 client2.getConnectionWindowSize(clientSettings)); |
259 } |
261 } |
260 |
262 |
261 /** |
263 /** |
262 * Case 1) Create from upgraded HTTP/1.1 connection. |
264 * Case 1) Create from upgraded HTTP/1.1 connection. |
263 * Is ready to use. Can be SSL. exchange is the Exchange |
265 * Is ready to use. Can be SSL. exchange is the Exchange |
772 */ |
774 */ |
773 private void sendConnectionPreface() throws IOException { |
775 private void sendConnectionPreface() throws IOException { |
774 Log.logTrace("{0}: start sending connection preface to {1}", |
776 Log.logTrace("{0}: start sending connection preface to {1}", |
775 connection.channel().getLocalAddress(), |
777 connection.channel().getLocalAddress(), |
776 connection.address()); |
778 connection.address()); |
777 SettingsFrame sf = client2.getClientSettings(); |
779 SettingsFrame sf = new SettingsFrame(clientSettings); |
|
780 int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE); |
778 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); |
781 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); |
779 Log.logFrames(sf, "OUT"); |
782 Log.logFrames(sf, "OUT"); |
780 // send preface bytes and SettingsFrame together |
783 // send preface bytes and SettingsFrame together |
781 HttpPublisher publisher = publisher(); |
784 HttpPublisher publisher = publisher(); |
782 publisher.enqueue(List.of(buf)); |
785 publisher.enqueue(List.of(buf)); |
786 Log.logTrace("PREFACE_BYTES sent"); |
789 Log.logTrace("PREFACE_BYTES sent"); |
787 Log.logTrace("Settings Frame sent"); |
790 Log.logTrace("Settings Frame sent"); |
788 |
791 |
789 // send a Window update for the receive buffer we are using |
792 // send a Window update for the receive buffer we are using |
790 // minus the initial 64 K specified in protocol |
793 // minus the initial 64 K specified in protocol |
791 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); |
794 final int len = windowUpdater.initialWindowSize - initialWindowSize; |
792 windowUpdater.sendWindowUpdate(len); |
795 if (len > 0) { |
|
796 windowUpdater.sendWindowUpdate(len); |
|
797 } |
793 // there will be an ACK to the windows update - which should |
798 // there will be an ACK to the windows update - which should |
794 // cause any pending data stored before the preface was sent to be |
799 // cause any pending data stored before the preface was sent to be |
795 // flushed (see PrefaceController). |
800 // flushed (see PrefaceController). |
796 Log.logTrace("finished sending connection preface"); |
801 Log.logTrace("finished sending connection preface"); |
797 debug.log(Level.DEBUG, "Triggering processing of buffered data" |
802 debug.log(Level.DEBUG, "Triggering processing of buffered data" |
1200 } |
1205 } |
1201 } |
1206 } |
1202 |
1207 |
1203 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { |
1208 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { |
1204 |
1209 |
|
1210 final int initialWindowSize; |
1205 public ConnectionWindowUpdateSender(Http2Connection connection, |
1211 public ConnectionWindowUpdateSender(Http2Connection connection, |
1206 int initialWindowSize) { |
1212 int initialWindowSize) { |
1207 super(connection, initialWindowSize); |
1213 super(connection, initialWindowSize); |
|
1214 this.initialWindowSize = initialWindowSize; |
1208 } |
1215 } |
1209 |
1216 |
1210 @Override |
1217 @Override |
1211 int getStreamId() { |
1218 int getStreamId() { |
1212 return 0; |
1219 return 0; |