src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
branchhttp-client-branch
changeset 56598 4c502e3991bf
parent 56531 15ff86a732ea
child 56616 5d2446adafaf
equal deleted inserted replaced
56572:c8fe5ffdfe98 56598:4c502e3991bf
   257     // to be sent. The following two fields are updated as soon as a stream
   257     // to be sent. The following two fields are updated as soon as a stream
   258     // is created and assigned to a connection. They are checked before
   258     // is created and assigned to a connection. They are checked before
   259     // assigning a stream to a connection.
   259     // assigning a stream to a connection.
   260     private int lastReservedClientStreamid = 1;
   260     private int lastReservedClientStreamid = 1;
   261     private int lastReservedServerStreamid = 0;
   261     private int lastReservedServerStreamid = 0;
       
   262     private int numReservedClientStreams = 0; // count of current streams
       
   263     private int numReservedServerStreams = 0; // count of current streams
   262     private final Encoder hpackOut;
   264     private final Encoder hpackOut;
   263     private final Decoder hpackIn;
   265     private final Decoder hpackIn;
   264     final SettingsFrame clientSettings;
   266     final SettingsFrame clientSettings;
   265     private volatile SettingsFrame serverSettings;
   267     private volatile SettingsFrame serverSettings;
   266     private final String key; // for HttpClientImpl.connections map
   268     private final String key; // for HttpClientImpl.connections map
   309                 client2.getConnectionWindowSize(clientSettings));
   311                 client2.getConnectionWindowSize(clientSettings));
   310     }
   312     }
   311 
   313 
   312     /**
   314     /**
   313      * Case 1) Create from upgraded HTTP/1.1 connection.
   315      * Case 1) Create from upgraded HTTP/1.1 connection.
   314      * Is ready to use. Can be SSL. exchange is the Exchange
   316      * Is ready to use. Can't be SSL. exchange is the Exchange
   315      * that initiated the connection, whose response will be delivered
   317      * that initiated the connection, whose response will be delivered
   316      * on a Stream.
   318      * on a Stream.
   317      */
   319      */
   318     private Http2Connection(HttpConnection connection,
   320     private Http2Connection(HttpConnection connection,
   319                     Http2ClientImpl client2,
   321                     Http2ClientImpl client2,
   323     {
   325     {
   324         this(connection,
   326         this(connection,
   325                 client2,
   327                 client2,
   326                 3, // stream 1 is registered during the upgrade
   328                 3, // stream 1 is registered during the upgrade
   327                 keyFor(connection));
   329                 keyFor(connection));
       
   330         reserveStream(true);
   328         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
   331         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
   329 
   332 
   330         Stream<?> initialStream = createStream(exchange);
   333         Stream<?> initialStream = createStream(exchange);
   331         initialStream.registerStream(1);
   334         initialStream.registerStream(1);
   332         windowController.registerStream(1, getInitialSendWindowSize());
   335         windowController.registerStream(1, getInitialSendWindowSize());
   406     }
   409     }
   407 
   410 
   408     // call these before assigning a request/stream to a connection
   411     // call these before assigning a request/stream to a connection
   409     // if false returned then a new Http2Connection is required
   412     // if false returned then a new Http2Connection is required
   410     // if true, the the stream may be assigned to this connection
   413     // if true, the the stream may be assigned to this connection
   411     synchronized boolean reserveStream(boolean clientInitiated) {
   414     // for server push, if false returned, then the stream should be cancelled
       
   415     synchronized boolean reserveStream(boolean clientInitiated) throws IOException {
   412         if (finalStream) {
   416         if (finalStream) {
   413             return false;
   417             return false;
   414         }
   418         }
   415         if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
   419         if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
   416             setFinalStream();
   420             setFinalStream();
   423         }
   427         }
   424         if (clientInitiated)
   428         if (clientInitiated)
   425             lastReservedClientStreamid+=2;
   429             lastReservedClientStreamid+=2;
   426         else
   430         else
   427             lastReservedServerStreamid+=2;
   431             lastReservedServerStreamid+=2;
       
   432 
       
   433         assert numReservedClientStreams >= 0;
       
   434         assert numReservedServerStreams >= 0;
       
   435         if (clientInitiated && numReservedClientStreams >= getMaxConcurrentClientStreams()) {
       
   436             throw new IOException("too many concurrent streams");
       
   437         } else if (clientInitiated) {
       
   438             numReservedClientStreams++;
       
   439         }
       
   440         if (!clientInitiated && numReservedServerStreams >= getMaxConcurrentServerStreams()) {
       
   441             return false;
       
   442         } else if (!clientInitiated) {
       
   443             numReservedServerStreams++;
       
   444         }
   428         return true;
   445         return true;
   429     }
   446     }
   430 
   447 
   431     /**
   448     /**
   432      * Throws an IOException if h2 was not negotiated
   449      * Throws an IOException if h2 was not negotiated
   561         }
   578         }
   562     }
   579     }
   563 
   580 
   564     final int getInitialSendWindowSize() {
   581     final int getInitialSendWindowSize() {
   565         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
   582         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
       
   583     }
       
   584 
       
   585     final int getMaxConcurrentClientStreams() {
       
   586         return serverSettings.getParameter(MAX_CONCURRENT_STREAMS);
       
   587     }
       
   588 
       
   589     final int getMaxConcurrentServerStreams() {
       
   590         return clientSettings.getParameter(MAX_CONCURRENT_STREAMS);
   566     }
   591     }
   567 
   592 
   568     void close() {
   593     void close() {
   569         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
   594         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
   570         GoAwayFrame f = new GoAwayFrame(0,
   595         GoAwayFrame f = new GoAwayFrame(0,
   816         }
   841         }
   817     }
   842     }
   818 
   843 
   819     void closeStream(int streamid) {
   844     void closeStream(int streamid) {
   820         if (debug.on()) debug.log("Closed stream %d", streamid);
   845         if (debug.on()) debug.log("Closed stream %d", streamid);
       
   846         boolean isClient = (streamid % 2) == 1;
   821         Stream<?> s = streams.remove(streamid);
   847         Stream<?> s = streams.remove(streamid);
   822         if (s != null) {
   848         if (s != null) {
       
   849             synchronized (this) {
       
   850                 if (isClient)
       
   851                     numReservedClientStreams--;
       
   852                 else
       
   853                     numReservedServerStreams--;
       
   854             }
       
   855             assert numReservedClientStreams >= 0;
       
   856             assert numReservedServerStreams >= 0;
   823             // decrement the reference count on the HttpClientImpl
   857             // decrement the reference count on the HttpClientImpl
   824             // to allow the SelectorManager thread to exit if no
   858             // to allow the SelectorManager thread to exit if no
   825             // other operation is pending and the facade is no
   859             // other operation is pending and the facade is no
   826             // longer referenced.
   860             // longer referenced.
   827             client().streamUnreference();
   861             client().streamUnreference();
  1166         private final ConcurrentLinkedQueue<ByteBuffer> queue
  1200         private final ConcurrentLinkedQueue<ByteBuffer> queue
  1167                 = new ConcurrentLinkedQueue<>();
  1201                 = new ConcurrentLinkedQueue<>();
  1168         private final SequentialScheduler scheduler =
  1202         private final SequentialScheduler scheduler =
  1169                 SequentialScheduler.synchronizedScheduler(this::processQueue);
  1203                 SequentialScheduler.synchronizedScheduler(this::processQueue);
  1170         private final HttpClientImpl client;
  1204         private final HttpClientImpl client;
  1171         
  1205 
  1172         Http2TubeSubscriber(HttpClientImpl client) {
  1206         Http2TubeSubscriber(HttpClientImpl client) {
  1173             this.client = Objects.requireNonNull(client);
  1207             this.client = Objects.requireNonNull(client);
  1174         }
  1208         }
  1175 
  1209 
  1176         final void processQueue() {
  1210         final void processQueue() {