226 * Each of this connection's Streams MUST use this controller. |
226 * Each of this connection's Streams MUST use this controller. |
227 */ |
227 */ |
228 private final WindowController windowController = new WindowController(); |
228 private final WindowController windowController = new WindowController(); |
229 private final FramesController framesController = new FramesController(); |
229 private final FramesController framesController = new FramesController(); |
230 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); |
230 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); |
231 final WindowUpdateSender windowUpdater; |
231 final ConnectionWindowUpdateSender windowUpdater; |
232 private volatile Throwable cause; |
232 private volatile Throwable cause; |
233 private volatile Supplier<ByteBuffer> initial; |
233 private volatile Supplier<ByteBuffer> initial; |
234 |
234 |
235 static final int DEFAULT_FRAME_SIZE = 16 * 1024; |
235 static final int DEFAULT_FRAME_SIZE = 16 * 1024; |
236 |
236 |
245 this.connection = connection; |
245 this.connection = connection; |
246 this.client2 = client2; |
246 this.client2 = client2; |
247 this.nextstreamid = nextstreamid; |
247 this.nextstreamid = nextstreamid; |
248 this.key = key; |
248 this.key = key; |
249 this.clientSettings = this.client2.getClientSettings(); |
249 this.clientSettings = this.client2.getClientSettings(); |
250 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); |
250 this.framesDecoder = new FramesDecoder(this::processFrame, |
|
251 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); |
251 // serverSettings will be updated by server |
252 // serverSettings will be updated by server |
252 this.serverSettings = SettingsFrame.getDefaultSettings(); |
253 this.serverSettings = SettingsFrame.getDefaultSettings(); |
253 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); |
254 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); |
254 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); |
255 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); |
255 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); |
256 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); |
256 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); |
257 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); |
257 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); |
258 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); |
258 this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize()); |
259 this.windowUpdater = new ConnectionWindowUpdateSender(this, |
|
260 client2.getConnectionWindowSize(clientSettings)); |
259 } |
261 } |
260 |
262 |
261 /** |
263 /** |
262 * Case 1) Create from upgraded HTTP/1.1 connection. |
264 * Case 1) Create from upgraded HTTP/1.1 connection. |
263 * Is ready to use. Can be SSL. exchange is the Exchange |
265 * Is ready to use. Can be SSL. exchange is the Exchange |
776 */ |
778 */ |
777 private void sendConnectionPreface() throws IOException { |
779 private void sendConnectionPreface() throws IOException { |
778 Log.logTrace("{0}: start sending connection preface to {1}", |
780 Log.logTrace("{0}: start sending connection preface to {1}", |
779 connection.channel().getLocalAddress(), |
781 connection.channel().getLocalAddress(), |
780 connection.address()); |
782 connection.address()); |
781 SettingsFrame sf = client2.getClientSettings(); |
783 SettingsFrame sf = new SettingsFrame(clientSettings); |
|
784 int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE); |
782 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); |
785 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); |
783 Log.logFrames(sf, "OUT"); |
786 Log.logFrames(sf, "OUT"); |
784 // send preface bytes and SettingsFrame together |
787 // send preface bytes and SettingsFrame together |
785 HttpPublisher publisher = publisher(); |
788 HttpPublisher publisher = publisher(); |
786 publisher.enqueue(List.of(buf)); |
789 publisher.enqueue(List.of(buf)); |
790 Log.logTrace("PREFACE_BYTES sent"); |
793 Log.logTrace("PREFACE_BYTES sent"); |
791 Log.logTrace("Settings Frame sent"); |
794 Log.logTrace("Settings Frame sent"); |
792 |
795 |
793 // send a Window update for the receive buffer we are using |
796 // send a Window update for the receive buffer we are using |
794 // minus the initial 64 K specified in protocol |
797 // minus the initial 64 K specified in protocol |
795 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); |
798 final int len = windowUpdater.initialWindowSize - initialWindowSize; |
796 windowUpdater.sendWindowUpdate(len); |
799 if (len > 0) { |
|
800 windowUpdater.sendWindowUpdate(len); |
|
801 } |
797 // there will be an ACK to the windows update - which should |
802 // there will be an ACK to the windows update - which should |
798 // cause any pending data stored before the preface was sent to be |
803 // cause any pending data stored before the preface was sent to be |
799 // flushed (see PrefaceController). |
804 // flushed (see PrefaceController). |
800 Log.logTrace("finished sending connection preface"); |
805 Log.logTrace("finished sending connection preface"); |
801 debug.log(Level.DEBUG, "Triggering processing of buffered data" |
806 debug.log(Level.DEBUG, "Triggering processing of buffered data" |
1204 } |
1209 } |
1205 } |
1210 } |
1206 |
1211 |
1207 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { |
1212 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { |
1208 |
1213 |
|
1214 final int initialWindowSize; |
1209 public ConnectionWindowUpdateSender(Http2Connection connection, |
1215 public ConnectionWindowUpdateSender(Http2Connection connection, |
1210 int initialWindowSize) { |
1216 int initialWindowSize) { |
1211 super(connection, initialWindowSize); |
1217 super(connection, initialWindowSize); |
|
1218 this.initialWindowSize = initialWindowSize; |
1212 } |
1219 } |
1213 |
1220 |
1214 @Override |
1221 @Override |
1215 int getStreamId() { |
1222 int getStreamId() { |
1216 return 0; |
1223 return 0; |