src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http2Connection.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56072 96c1f6e984eb
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.io.IOException;
       
    30 import java.lang.System.Logger.Level;
       
    31 import java.net.InetSocketAddress;
       
    32 import java.net.URI;
       
    33 import java.nio.ByteBuffer;
       
    34 import java.nio.charset.StandardCharsets;
       
    35 import java.util.Iterator;
       
    36 import java.util.LinkedList;
       
    37 import java.util.List;
       
    38 import java.util.Map;
       
    39 import java.util.concurrent.CompletableFuture;
       
    40 import java.util.ArrayList;
       
    41 import java.util.Objects;
       
    42 import java.util.concurrent.ConcurrentHashMap;
       
    43 import java.util.concurrent.ConcurrentLinkedQueue;
       
    44 import java.util.concurrent.Flow;
       
    45 import java.util.function.Function;
       
    46 import java.util.function.Supplier;
       
    47 import javax.net.ssl.SSLEngine;
       
    48 import javax.net.ssl.SSLException;
       
    49 import jdk.incubator.http.HttpClient;
       
    50 import jdk.incubator.http.HttpHeaders;
       
    51 import jdk.incubator.http.internal.HttpConnection.HttpPublisher;
       
    52 import jdk.incubator.http.internal.common.FlowTube;
       
    53 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
       
    54 import jdk.incubator.http.internal.common.HttpHeadersImpl;
       
    55 import jdk.incubator.http.internal.common.Log;
       
    56 import jdk.incubator.http.internal.common.MinimalFuture;
       
    57 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    58 import jdk.incubator.http.internal.common.Utils;
       
    59 import jdk.incubator.http.internal.frame.ContinuationFrame;
       
    60 import jdk.incubator.http.internal.frame.DataFrame;
       
    61 import jdk.incubator.http.internal.frame.ErrorFrame;
       
    62 import jdk.incubator.http.internal.frame.FramesDecoder;
       
    63 import jdk.incubator.http.internal.frame.FramesEncoder;
       
    64 import jdk.incubator.http.internal.frame.GoAwayFrame;
       
    65 import jdk.incubator.http.internal.frame.HeaderFrame;
       
    66 import jdk.incubator.http.internal.frame.HeadersFrame;
       
    67 import jdk.incubator.http.internal.frame.Http2Frame;
       
    68 import jdk.incubator.http.internal.frame.MalformedFrame;
       
    69 import jdk.incubator.http.internal.frame.OutgoingHeaders;
       
    70 import jdk.incubator.http.internal.frame.PingFrame;
       
    71 import jdk.incubator.http.internal.frame.PushPromiseFrame;
       
    72 import jdk.incubator.http.internal.frame.ResetFrame;
       
    73 import jdk.incubator.http.internal.frame.SettingsFrame;
       
    74 import jdk.incubator.http.internal.frame.WindowUpdateFrame;
       
    75 import jdk.incubator.http.internal.hpack.Encoder;
       
    76 import jdk.incubator.http.internal.hpack.Decoder;
       
    77 import jdk.incubator.http.internal.hpack.DecodingCallback;
       
    78 import static java.nio.charset.StandardCharsets.UTF_8;
       
    79 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
       
    80 
       
    81 
       
    82 /**
       
    83  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
       
    84  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
       
    85  *
       
    86  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
       
    87  * to a HttpClientImpl.
       
    88  *
       
    89  * Creation cases:
       
    90  * 1) upgraded HTTP/1.1 plain tcp connection
       
    91  * 2) prior knowledge directly created plain tcp connection
       
    92  * 3) directly created HTTP/2 SSL connection which uses ALPN.
       
    93  *
       
    94  * Sending is done by writing directly to underlying HttpConnection object which
       
    95  * is operating in async mode. No flow control applies on output at this level
       
    96  * and all writes are just executed as puts to an output Q belonging to HttpConnection
       
    97  * Flow control is implemented by HTTP/2 protocol itself.
       
    98  *
       
    99  * Hpack header compression
       
   100  * and outgoing stream creation is also done here, because these operations
       
   101  * must be synchronized at the socket level. Stream objects send frames simply
       
   102  * by placing them on the connection's output Queue. sendFrame() is called
       
   103  * from a higher level (Stream) thread.
       
   104  *
       
   105  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
       
   106  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
       
   107  * or handles them directly itself. This thread performs hpack decompression
       
   108  * and incoming stream creation (Server push). Incoming frames destined for a
       
   109  * stream are provided by calling Stream.incoming().
       
   110  */
       
   111 class Http2Connection  {
       
   112 
       
   113     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
   114     static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
       
   115     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   116     final static System.Logger  DEBUG_LOGGER =
       
   117             Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
       
   118     private final System.Logger debugHpack =
       
   119                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
       
   120     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
       
   121 
       
   122     private boolean singleStream; // used only for stream 1, then closed
       
   123 
       
   124     /*
       
   125      *  ByteBuffer pooling strategy for HTTP/2 protocol:
       
   126      *
       
   127      * In general there are 4 points where ByteBuffers are used:
       
   128      *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
       
   129      *    in case of SSL connection.
       
   130      *
       
   131      * 1. Outgoing frames encoded to ByteBuffers.
       
   132      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
       
   133      *    At this place no pools at all. All outgoing buffers should be collected by GC.
       
   134      *
       
   135      * 2. Incoming ByteBuffers (decoded to frames).
       
   136      *    Here, total elimination of BB pool is not a good idea.
       
   137      *    We don't know how many bytes we will receive through network.
       
   138      * So here we allocate buffer of reasonable size. The following life of the BB:
       
   139      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
       
   140      *     BB is returned to pool,
       
   141      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
       
   142      *     Such BB is never returned to pool and will be GCed.
       
   143      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
       
   144      *     the buffer could be release to pool.
       
   145      *
       
   146      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
       
   147      *    because of we can't predict size encrypted packets.
       
   148      *
       
   149      */
       
   150 
       
   151 
       
   152     // A small class that allows to control frames with respect to the state of
       
   153     // the connection preface. Any data received before the connection
       
   154     // preface is sent will be buffered.
       
   155     private final class FramesController {
       
   156         volatile boolean prefaceSent;
       
   157         volatile List<ByteBuffer> pending;
       
   158 
       
   159         boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
       
   160                 throws IOException
       
   161         {
       
   162             // if preface is not sent, buffers data in the pending list
       
   163             if (!prefaceSent) {
       
   164                 debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
       
   165                           buf.remaining());
       
   166                 synchronized (this) {
       
   167                     if (!prefaceSent) {
       
   168                         if (pending == null) pending = new ArrayList<>();
       
   169                         pending.add(buf);
       
   170                         debug.log(Level.DEBUG, () -> "there are now "
       
   171                               + Utils.remaining(pending)
       
   172                               + " bytes buffered waiting for preface to be sent");
       
   173                         return false;
       
   174                     }
       
   175                 }
       
   176             }
       
   177 
       
   178             // Preface is sent. Checks for pending data and flush it.
       
   179             // We rely on this method being called from within the Http2TubeSubscriber
       
   180             // scheduler, so we know that no other thread could execute this method
       
   181             // concurrently while we're here.
       
   182             // This ensures that later incoming buffers will not
       
   183             // be processed before we have flushed the pending queue.
       
   184             // No additional synchronization is therefore necessary here.
       
   185             List<ByteBuffer> pending = this.pending;
       
   186             this.pending = null;
       
   187             if (pending != null) {
       
   188                 // flush pending data
       
   189                 debug.log(Level.DEBUG, () -> "Processing buffered data: "
       
   190                       + Utils.remaining(pending));
       
   191                 for (ByteBuffer b : pending) {
       
   192                     decoder.decode(b);
       
   193                 }
       
   194             }
       
   195             // push the received buffer to the frames decoder.
       
   196             if (buf != EMPTY_TRIGGER) {
       
   197                 debug.log(Level.DEBUG, "Processing %d", buf.remaining());
       
   198                 decoder.decode(buf);
       
   199             }
       
   200             return true;
       
   201         }
       
   202 
       
   203         // Mark that the connection preface is sent
       
   204         void markPrefaceSent() {
       
   205             assert !prefaceSent;
       
   206             synchronized (this) {
       
   207                 prefaceSent = true;
       
   208             }
       
   209         }
       
   210     }
       
   211 
       
   212     volatile boolean closed;
       
   213 
       
   214     //-------------------------------------
       
   215     final HttpConnection connection;
       
   216     private final Http2ClientImpl client2;
       
   217     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
       
   218     private int nextstreamid;
       
   219     private int nextPushStream = 2;
       
   220     private final Encoder hpackOut;
       
   221     private final Decoder hpackIn;
       
   222     final SettingsFrame clientSettings;
       
   223     private volatile SettingsFrame serverSettings;
       
   224     private final String key; // for HttpClientImpl.connections map
       
   225     private final FramesDecoder framesDecoder;
       
   226     private final FramesEncoder framesEncoder = new FramesEncoder();
       
   227 
       
   228     /**
       
   229      * Send Window controller for both connection and stream windows.
       
   230      * Each of this connection's Streams MUST use this controller.
       
   231      */
       
   232     private final WindowController windowController = new WindowController();
       
   233     private final FramesController framesController = new FramesController();
       
   234     private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
       
   235     final ConnectionWindowUpdateSender windowUpdater;
       
   236     private volatile Throwable cause;
       
   237     private volatile Supplier<ByteBuffer> initial;
       
   238 
       
   239     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
       
   240 
       
   241 
       
   242     // TODO: need list of control frames from other threads
       
   243     // that need to be sent
       
   244 
       
   245     private Http2Connection(HttpConnection connection,
       
   246                             Http2ClientImpl client2,
       
   247                             int nextstreamid,
       
   248                             String key) {
       
   249         this.connection = connection;
       
   250         this.client2 = client2;
       
   251         this.nextstreamid = nextstreamid;
       
   252         this.key = key;
       
   253         this.clientSettings = this.client2.getClientSettings();
       
   254         this.framesDecoder = new FramesDecoder(this::processFrame,
       
   255                 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
       
   256         // serverSettings will be updated by server
       
   257         this.serverSettings = SettingsFrame.getDefaultSettings();
       
   258         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
       
   259         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
       
   260         debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
       
   261         debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
       
   262         debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
       
   263         this.windowUpdater = new ConnectionWindowUpdateSender(this,
       
   264                 client2.getConnectionWindowSize(clientSettings));
       
   265     }
       
   266 
       
   267     /**
       
   268      * Case 1) Create from upgraded HTTP/1.1 connection.
       
   269      * Is ready to use. Can be SSL. exchange is the Exchange
       
   270      * that initiated the connection, whose response will be delivered
       
   271      * on a Stream.
       
   272      */
       
   273     private Http2Connection(HttpConnection connection,
       
   274                     Http2ClientImpl client2,
       
   275                     Exchange<?> exchange,
       
   276                     Supplier<ByteBuffer> initial)
       
   277         throws IOException, InterruptedException
       
   278     {
       
   279         this(connection,
       
   280                 client2,
       
   281                 3, // stream 1 is registered during the upgrade
       
   282                 keyFor(connection));
       
   283         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
       
   284 
       
   285         Stream<?> initialStream = createStream(exchange);
       
   286         initialStream.registerStream(1);
       
   287         windowController.registerStream(1, getInitialSendWindowSize());
       
   288         initialStream.requestSent();
       
   289         // Upgrading:
       
   290         //    set callbacks before sending preface - makes sure anything that
       
   291         //    might be sent by the server will come our way.
       
   292         this.initial = initial;
       
   293         connectFlows(connection);
       
   294         sendConnectionPreface();
       
   295     }
       
   296 
       
   297     // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
       
   298     // agreement from the server. Async style but completes immediately, because
       
   299     // the connection is already connected.
       
   300     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
       
   301                                                           Http2ClientImpl client2,
       
   302                                                           Exchange<?> exchange,
       
   303                                                           Supplier<ByteBuffer> initial)
       
   304     {
       
   305         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
       
   306     }
       
   307 
       
   308     // Requires TLS handshake. So, is really async
       
   309     static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
       
   310                                                           Http2ClientImpl h2client) {
       
   311         assert request.secure();
       
   312         AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
       
   313         HttpConnection.getConnection(request.getAddress(),
       
   314                                      h2client.client(),
       
   315                                      request,
       
   316                                      HttpClient.Version.HTTP_2);
       
   317 
       
   318         return connection.connectAsync()
       
   319                   .thenCompose(unused -> checkSSLConfig(connection))
       
   320                   .thenCompose(notused-> {
       
   321                       CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
       
   322                       try {
       
   323                           Http2Connection hc = new Http2Connection(request, h2client, connection);
       
   324                           cf.complete(hc);
       
   325                       } catch (IOException e) {
       
   326                           cf.completeExceptionally(e);
       
   327                       }
       
   328                       return cf; } );
       
   329     }
       
   330 
       
   331     /**
       
   332      * Cases 2) 3)
       
   333      *
       
   334      * request is request to be sent.
       
   335      */
       
   336     private Http2Connection(HttpRequestImpl request,
       
   337                             Http2ClientImpl h2client,
       
   338                             HttpConnection connection)
       
   339         throws IOException
       
   340     {
       
   341         this(connection,
       
   342              h2client,
       
   343              1,
       
   344              keyFor(request.uri(), request.proxy()));
       
   345 
       
   346         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
       
   347 
       
   348         // safe to resume async reading now.
       
   349         connectFlows(connection);
       
   350         sendConnectionPreface();
       
   351     }
       
   352 
       
   353     private void connectFlows(HttpConnection connection) {
       
   354         FlowTube tube =  connection.getConnectionFlow();
       
   355         // Connect the flow to our Http2TubeSubscriber:
       
   356         tube.connectFlows(connection.publisher(), subscriber);
       
   357     }
       
   358 
       
   359     final HttpClientImpl client() {
       
   360         return client2.client();
       
   361     }
       
   362 
       
   363     /**
       
   364      * Throws an IOException if h2 was not negotiated
       
   365      */
       
   366     private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
       
   367         assert aconn.isSecure();
       
   368 
       
   369         Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
       
   370             CompletableFuture<Void> cf = new MinimalFuture<>();
       
   371             SSLEngine engine = aconn.getEngine();
       
   372             assert Objects.equals(alpn, engine.getApplicationProtocol());
       
   373 
       
   374             DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
       
   375 
       
   376             if (alpn == null || !alpn.equals("h2")) {
       
   377                 String msg;
       
   378                 if (alpn == null) {
       
   379                     Log.logSSL("ALPN not supported");
       
   380                     msg = "ALPN not supported";
       
   381                 } else {
       
   382                     switch (alpn) {
       
   383                         case "":
       
   384                             Log.logSSL(msg = "No ALPN negotiated");
       
   385                             break;
       
   386                         case "http/1.1":
       
   387                             Log.logSSL( msg = "HTTP/1.1 ALPN returned");
       
   388                             break;
       
   389                         default:
       
   390                             Log.logSSL(msg = "Unexpected ALPN: " + alpn);
       
   391                             cf.completeExceptionally(new IOException(msg));
       
   392                     }
       
   393                 }
       
   394                 cf.completeExceptionally(new ALPNException(msg, aconn));
       
   395                 return cf;
       
   396             }
       
   397             cf.complete(null);
       
   398             return cf;
       
   399         };
       
   400 
       
   401         return aconn.getALPN()
       
   402                 .whenComplete((r,t) -> {
       
   403                     if (t != null && t instanceof SSLException) {
       
   404                         // something went wrong during the initial handshake
       
   405                         // close the connection
       
   406                         aconn.close();
       
   407                     }
       
   408                 })
       
   409                 .thenCompose(checkAlpnCF);
       
   410     }
       
   411 
       
   412     synchronized boolean singleStream() {
       
   413         return singleStream;
       
   414     }
       
   415 
       
   416     synchronized void setSingleStream(boolean use) {
       
   417         singleStream = use;
       
   418     }
       
   419 
       
   420     static String keyFor(HttpConnection connection) {
       
   421         boolean isProxy = connection.isProxied();
       
   422         boolean isSecure = connection.isSecure();
       
   423         InetSocketAddress addr = connection.address();
       
   424 
       
   425         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
       
   426     }
       
   427 
       
   428     static String keyFor(URI uri, InetSocketAddress proxy) {
       
   429         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
       
   430         boolean isProxy = proxy != null;
       
   431 
       
   432         String host;
       
   433         int port;
       
   434 
       
   435         if (proxy != null) {
       
   436             host = proxy.getHostString();
       
   437             port = proxy.getPort();
       
   438         } else {
       
   439             host = uri.getHost();
       
   440             port = uri.getPort();
       
   441         }
       
   442         return keyString(isSecure, isProxy, host, port);
       
   443     }
       
   444 
       
   445     // {C,S}:{H:P}:host:port
       
   446     // C indicates clear text connection "http"
       
   447     // S indicates secure "https"
       
   448     // H indicates host (direct) connection
       
   449     // P indicates proxy
       
   450     // Eg: "S:H:foo.com:80"
       
   451     static String keyString(boolean secure, boolean proxy, String host, int port) {
       
   452         if (secure && port == -1)
       
   453             port = 443;
       
   454         else if (!secure && port == -1)
       
   455             port = 80;
       
   456         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
       
   457     }
       
   458 
       
   459     String key() {
       
   460         return this.key;
       
   461     }
       
   462 
       
   463     boolean offerConnection() {
       
   464         return client2.offerConnection(this);
       
   465     }
       
   466 
       
   467     private HttpPublisher publisher() {
       
   468         return connection.publisher();
       
   469     }
       
   470 
       
   471     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
       
   472             throws IOException
       
   473     {
       
   474         debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
       
   475 
       
   476         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
       
   477 
       
   478         List<ByteBuffer> buffers = frame.getHeaderBlock();
       
   479         int len = buffers.size();
       
   480         for (int i = 0; i < len; i++) {
       
   481             ByteBuffer b = buffers.get(i);
       
   482             hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
       
   483         }
       
   484     }
       
   485 
       
   486     final int getInitialSendWindowSize() {
       
   487         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
       
   488     }
       
   489 
       
   490     void close() {
       
   491         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
       
   492         GoAwayFrame f = new GoAwayFrame(0,
       
   493                                         ErrorFrame.NO_ERROR,
       
   494                                         "Requested by user".getBytes(UTF_8));
       
   495         // TODO: set last stream. For now zero ok.
       
   496         sendFrame(f);
       
   497     }
       
   498 
       
   499     long count;
       
   500     final void asyncReceive(ByteBuffer buffer) {
       
   501         // We don't need to read anything and
       
   502         // we don't want to send anything back to the server
       
   503         // until the connection preface has been sent.
       
   504         // Therefore we're going to wait if needed before reading
       
   505         // (and thus replying) to anything.
       
   506         // Starting to reply to something (e.g send an ACK to a
       
   507         // SettingsFrame sent by the server) before the connection
       
   508         // preface is fully sent might result in the server
       
   509         // sending a GOAWAY frame with 'invalid_preface'.
       
   510         //
       
   511         // Note: asyncReceive is only called from the Http2TubeSubscriber
       
   512         //       sequential scheduler.
       
   513         try {
       
   514             Supplier<ByteBuffer> bs = initial;
       
   515             // ensure that we always handle the initial buffer first,
       
   516             // if any.
       
   517             if (bs != null) {
       
   518                 initial = null;
       
   519                 ByteBuffer b = bs.get();
       
   520                 if (b.hasRemaining()) {
       
   521                     long c = ++count;
       
   522                     debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
       
   523                         + c +"): " + b.remaining());
       
   524                     framesController.processReceivedData(framesDecoder, b);
       
   525                 }
       
   526             }
       
   527             ByteBuffer b = buffer;
       
   528             // the Http2TubeSubscriber scheduler ensures that the order of incoming
       
   529             // buffers is preserved.
       
   530             if (b == EMPTY_TRIGGER) {
       
   531                 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
       
   532                 boolean prefaceSent = framesController.prefaceSent;
       
   533                 assert prefaceSent;
       
   534                 // call framesController.processReceivedData to potentially
       
   535                 // trigger the processing of all the data buffered there.
       
   536                 framesController.processReceivedData(framesDecoder, buffer);
       
   537                 debug.log(Level.DEBUG, "H2 processed buffered data");
       
   538             } else {
       
   539                 long c = ++count;
       
   540                 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
       
   541                 framesController.processReceivedData(framesDecoder, buffer);
       
   542                 debug.log(Level.DEBUG, "H2 processed(%d)", c);
       
   543             }
       
   544         } catch (Throwable e) {
       
   545             String msg = Utils.stackTrace(e);
       
   546             Log.logTrace(msg);
       
   547             shutdown(e);
       
   548         }
       
   549     }
       
   550 
       
   551     Throwable getRecordedCause() {
       
   552         return cause;
       
   553     }
       
   554 
       
   555     void shutdown(Throwable t) {
       
   556         debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
       
   557         if (closed == true) return;
       
   558         synchronized (this) {
       
   559             if (closed == true) return;
       
   560             closed = true;
       
   561         }
       
   562         Log.logError(t);
       
   563         Throwable initialCause = this.cause;
       
   564         if (initialCause == null) this.cause = t;
       
   565         client2.deleteConnection(this);
       
   566         List<Stream<?>> c = new LinkedList<>(streams.values());
       
   567         for (Stream<?> s : c) {
       
   568             s.cancelImpl(t);
       
   569         }
       
   570         connection.close();
       
   571     }
       
   572 
       
   573     /**
       
   574      * Streams initiated by a client MUST use odd-numbered stream
       
   575      * identifiers; those initiated by the server MUST use even-numbered
       
   576      * stream identifiers.
       
   577      */
       
   578     private static final boolean isSeverInitiatedStream(int streamid) {
       
   579         return (streamid & 0x1) == 0;
       
   580     }
       
   581 
       
   582     /**
       
   583      * Handles stream 0 (common) frames that apply to whole connection and passes
       
   584      * other stream specific frames to that Stream object.
       
   585      *
       
   586      * Invokes Stream.incoming() which is expected to process frame without
       
   587      * blocking.
       
   588      */
       
   589     void processFrame(Http2Frame frame) throws IOException {
       
   590         Log.logFrames(frame, "IN");
       
   591         int streamid = frame.streamid();
       
   592         if (frame instanceof MalformedFrame) {
       
   593             Log.logError(((MalformedFrame) frame).getMessage());
       
   594             if (streamid == 0) {
       
   595                 framesDecoder.close("Malformed frame on stream 0");
       
   596                 protocolError(((MalformedFrame) frame).getErrorCode(),
       
   597                         ((MalformedFrame) frame).getMessage());
       
   598             } else {
       
   599                 debug.log(Level.DEBUG, () -> "Reset stream: "
       
   600                           + ((MalformedFrame) frame).getMessage());
       
   601                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
       
   602             }
       
   603             return;
       
   604         }
       
   605         if (streamid == 0) {
       
   606             handleConnectionFrame(frame);
       
   607         } else {
       
   608             if (frame instanceof SettingsFrame) {
       
   609                 // The stream identifier for a SETTINGS frame MUST be zero
       
   610                 framesDecoder.close(
       
   611                         "The stream identifier for a SETTINGS frame MUST be zero");
       
   612                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
       
   613                 return;
       
   614             }
       
   615 
       
   616             Stream<?> stream = getStream(streamid);
       
   617             if (stream == null) {
       
   618                 // Should never receive a frame with unknown stream id
       
   619 
       
   620                 if (frame instanceof HeaderFrame) {
       
   621                     // always decode the headers as they may affect
       
   622                     // connection-level HPACK decoding state
       
   623                     HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
       
   624                     decodeHeaders((HeaderFrame) frame, decoder);
       
   625                 }
       
   626 
       
   627                 if (!(frame instanceof ResetFrame)) {
       
   628                     if (isSeverInitiatedStream(streamid)) {
       
   629                         if (streamid < nextPushStream) {
       
   630                             // trailing data on a cancelled push promise stream,
       
   631                             // reset will already have been sent, ignore
       
   632                             Log.logTrace("Ignoring cancelled push promise frame " + frame);
       
   633                         } else {
       
   634                             resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   635                         }
       
   636                     } else if (streamid >= nextstreamid) {
       
   637                         // otherwise the stream has already been reset/closed
       
   638                         resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   639                     }
       
   640                 }
       
   641                 return;
       
   642             }
       
   643             if (frame instanceof PushPromiseFrame) {
       
   644                 PushPromiseFrame pp = (PushPromiseFrame)frame;
       
   645                 handlePushPromise(stream, pp);
       
   646             } else if (frame instanceof HeaderFrame) {
       
   647                 // decode headers (or continuation)
       
   648                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
       
   649                 stream.incoming(frame);
       
   650             } else {
       
   651                 stream.incoming(frame);
       
   652             }
       
   653         }
       
   654     }
       
   655 
       
   656     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
       
   657         throws IOException
       
   658     {
       
   659         // always decode the headers as they may affect connection-level HPACK
       
   660         // decoding state
       
   661         HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
       
   662         decodeHeaders(pp, decoder);
       
   663 
       
   664         HttpRequestImpl parentReq = parent.request;
       
   665         int promisedStreamid = pp.getPromisedStream();
       
   666         if (promisedStreamid != nextPushStream) {
       
   667             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
       
   668             return;
       
   669         } else {
       
   670             nextPushStream += 2;
       
   671         }
       
   672 
       
   673         HttpHeadersImpl headers = decoder.headers();
       
   674         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
       
   675         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
       
   676         Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
       
   677         pushExch.exchImpl = pushStream;
       
   678         pushStream.registerStream(promisedStreamid);
       
   679         parent.incoming_pushPromise(pushReq, pushStream);
       
   680     }
       
   681 
       
   682     private void handleConnectionFrame(Http2Frame frame)
       
   683         throws IOException
       
   684     {
       
   685         switch (frame.type()) {
       
   686           case SettingsFrame.TYPE:
       
   687               handleSettings((SettingsFrame)frame);
       
   688               break;
       
   689           case PingFrame.TYPE:
       
   690               handlePing((PingFrame)frame);
       
   691               break;
       
   692           case GoAwayFrame.TYPE:
       
   693               handleGoAway((GoAwayFrame)frame);
       
   694               break;
       
   695           case WindowUpdateFrame.TYPE:
       
   696               handleWindowUpdate((WindowUpdateFrame)frame);
       
   697               break;
       
   698           default:
       
   699             protocolError(ErrorFrame.PROTOCOL_ERROR);
       
   700         }
       
   701     }
       
   702 
       
   703     void resetStream(int streamid, int code) throws IOException {
       
   704         Log.logError(
       
   705             "Resetting stream {0,number,integer} with error code {1,number,integer}",
       
   706             streamid, code);
       
   707         ResetFrame frame = new ResetFrame(streamid, code);
       
   708         sendFrame(frame);
       
   709         closeStream(streamid);
       
   710     }
       
   711 
       
   712     void closeStream(int streamid) {
       
   713         debug.log(Level.DEBUG, "Closed stream %d", streamid);
       
   714         Stream<?> s = streams.remove(streamid);
       
   715         if (s != null) {
       
   716             // decrement the reference count on the HttpClientImpl
       
   717             // to allow the SelectorManager thread to exit if no
       
   718             // other operation is pending and the facade is no
       
   719             // longer referenced.
       
   720             client().unreference();
       
   721         }
       
   722         // ## Remove s != null. It is a hack for delayed cancellation,reset
       
   723         if (s != null && !(s instanceof Stream.PushedStream)) {
       
   724             // Since PushStreams have no request body, then they have no
       
   725             // corresponding entry in the window controller.
       
   726             windowController.removeStream(streamid);
       
   727         }
       
   728         if (singleStream() && streams.isEmpty()) {
       
   729             // should be only 1 stream, but there might be more if server push
       
   730             close();
       
   731         }
       
   732     }
       
   733 
       
   734     /**
       
   735      * Increments this connection's send Window by the amount in the given frame.
       
   736      */
       
   737     private void handleWindowUpdate(WindowUpdateFrame f)
       
   738         throws IOException
       
   739     {
       
   740         int amount = f.getUpdate();
       
   741         if (amount <= 0) {
       
   742             // ## temporarily disable to workaround a bug in Jetty where it
       
   743             // ## sends Window updates with a 0 update value.
       
   744             //protocolError(ErrorFrame.PROTOCOL_ERROR);
       
   745         } else {
       
   746             boolean success = windowController.increaseConnectionWindow(amount);
       
   747             if (!success) {
       
   748                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
       
   749             }
       
   750         }
       
   751     }
       
   752 
       
   753     private void protocolError(int errorCode)
       
   754         throws IOException
       
   755     {
       
   756         protocolError(errorCode, null);
       
   757     }
       
   758 
       
   759     private void protocolError(int errorCode, String msg)
       
   760         throws IOException
       
   761     {
       
   762         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
       
   763         sendFrame(frame);
       
   764         shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
       
   765     }
       
   766 
       
   767     private void handleSettings(SettingsFrame frame)
       
   768         throws IOException
       
   769     {
       
   770         assert frame.streamid() == 0;
       
   771         if (!frame.getFlag(SettingsFrame.ACK)) {
       
   772             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
       
   773             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
       
   774             int diff = newWindowSize - oldWindowSize;
       
   775             if (diff != 0) {
       
   776                 windowController.adjustActiveStreams(diff);
       
   777             }
       
   778             serverSettings = frame;
       
   779             sendFrame(new SettingsFrame(SettingsFrame.ACK));
       
   780         }
       
   781     }
       
   782 
       
   783     private void handlePing(PingFrame frame)
       
   784         throws IOException
       
   785     {
       
   786         frame.setFlag(PingFrame.ACK);
       
   787         sendUnorderedFrame(frame);
       
   788     }
       
   789 
       
   790     private void handleGoAway(GoAwayFrame frame)
       
   791         throws IOException
       
   792     {
       
   793         shutdown(new IOException(
       
   794                         String.valueOf(connection.channel().getLocalAddress())
       
   795                         +": GOAWAY received"));
       
   796     }
       
   797 
       
   798     /**
       
   799      * Max frame size we are allowed to send
       
   800      */
       
   801     public int getMaxSendFrameSize() {
       
   802         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
       
   803         if (param == -1) {
       
   804             param = DEFAULT_FRAME_SIZE;
       
   805         }
       
   806         return param;
       
   807     }
       
   808 
       
   809     /**
       
   810      * Max frame size we will receive
       
   811      */
       
   812     public int getMaxReceiveFrameSize() {
       
   813         return clientSettings.getParameter(MAX_FRAME_SIZE);
       
   814     }
       
   815 
       
   816     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
       
   817 
       
   818     private static final byte[] PREFACE_BYTES =
       
   819         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
       
   820 
       
   821     /**
       
   822      * Sends Connection preface and Settings frame with current preferred
       
   823      * values
       
   824      */
       
   825     private void sendConnectionPreface() throws IOException {
       
   826         Log.logTrace("{0}: start sending connection preface to {1}",
       
   827                      connection.channel().getLocalAddress(),
       
   828                      connection.address());
       
   829         SettingsFrame sf = new SettingsFrame(clientSettings);
       
   830         int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
       
   831         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
       
   832         Log.logFrames(sf, "OUT");
       
   833         // send preface bytes and SettingsFrame together
       
   834         HttpPublisher publisher = publisher();
       
   835         publisher.enqueue(List.of(buf));
       
   836         publisher.signalEnqueued();
       
   837         // mark preface sent.
       
   838         framesController.markPrefaceSent();
       
   839         Log.logTrace("PREFACE_BYTES sent");
       
   840         Log.logTrace("Settings Frame sent");
       
   841 
       
   842         // send a Window update for the receive buffer we are using
       
   843         // minus the initial 64 K specified in protocol
       
   844         final int len = windowUpdater.initialWindowSize - initialWindowSize;
       
   845         if (len > 0) {
       
   846             windowUpdater.sendWindowUpdate(len);
       
   847         }
       
   848         // there will be an ACK to the windows update - which should
       
   849         // cause any pending data stored before the preface was sent to be
       
   850         // flushed (see PrefaceController).
       
   851         Log.logTrace("finished sending connection preface");
       
   852         debug.log(Level.DEBUG, "Triggering processing of buffered data"
       
   853                   + " after sending connection preface");
       
   854         subscriber.onNext(List.of(EMPTY_TRIGGER));
       
   855     }
       
   856 
       
   857     /**
       
   858      * Returns an existing Stream with given id, or null if doesn't exist
       
   859      */
       
   860     @SuppressWarnings("unchecked")
       
   861     <T> Stream<T> getStream(int streamid) {
       
   862         return (Stream<T>)streams.get(streamid);
       
   863     }
       
   864 
       
   865     /**
       
   866      * Creates Stream with given id.
       
   867      */
       
   868     final <T> Stream<T> createStream(Exchange<T> exchange) {
       
   869         Stream<T> stream = new Stream<>(this, exchange, windowController);
       
   870         return stream;
       
   871     }
       
   872 
       
   873     <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
       
   874         PushGroup<T> pg = parent.exchange.getPushGroup();
       
   875         return new Stream.PushedStream<>(pg, this, pushEx);
       
   876     }
       
   877 
       
   878     <T> void putStream(Stream<T> stream, int streamid) {
       
   879         // increment the reference count on the HttpClientImpl
       
   880         // to prevent the SelectorManager thread from exiting until
       
   881         // the stream is closed.
       
   882         client().reference();
       
   883         streams.put(streamid, stream);
       
   884     }
       
   885 
       
   886     /**
       
   887      * Encode the headers into a List<ByteBuffer> and then create HEADERS
       
   888      * and CONTINUATION frames from the list and return the List<Http2Frame>.
       
   889      */
       
   890     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
       
   891         List<ByteBuffer> buffers = encodeHeadersImpl(
       
   892                 getMaxSendFrameSize(),
       
   893                 frame.getAttachment().getRequestPseudoHeaders(),
       
   894                 frame.getUserHeaders(),
       
   895                 frame.getSystemHeaders());
       
   896 
       
   897         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
       
   898         Iterator<ByteBuffer> bufIterator = buffers.iterator();
       
   899         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
       
   900         frames.add(oframe);
       
   901         while(bufIterator.hasNext()) {
       
   902             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
       
   903             frames.add(oframe);
       
   904         }
       
   905         oframe.setFlag(HeaderFrame.END_HEADERS);
       
   906         return frames;
       
   907     }
       
   908 
       
   909     // Dedicated cache for headers encoding ByteBuffer.
       
   910     // There can be no concurrent access to this  buffer as all access to this buffer
       
   911     // and its content happen within a single critical code block section protected
       
   912     // by the sendLock. / (see sendFrame())
       
   913     // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
       
   914 
       
   915     private ByteBuffer getHeaderBuffer(int maxFrameSize) {
       
   916         ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
       
   917         buf.limit(maxFrameSize);
       
   918         return buf;
       
   919     }
       
   920 
       
   921     /*
       
   922      * Encodes all the headers from the given HttpHeaders into the given List
       
   923      * of buffers.
       
   924      *
       
   925      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
       
   926      *
       
   927      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
       
   928      *     characters that are compared in a case-insensitive fashion.  However,
       
   929      *     header field names MUST be converted to lowercase prior to their
       
   930      *     encoding in HTTP/2...
       
   931      */
       
   932     private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
       
   933         ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
       
   934         List<ByteBuffer> buffers = new ArrayList<>();
       
   935         for(HttpHeaders header : headers) {
       
   936             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
       
   937                 String lKey = e.getKey().toLowerCase();
       
   938                 List<String> values = e.getValue();
       
   939                 for (String value : values) {
       
   940                     hpackOut.header(lKey, value);
       
   941                     while (!hpackOut.encode(buffer)) {
       
   942                         buffer.flip();
       
   943                         buffers.add(buffer);
       
   944                         buffer =  getHeaderBuffer(maxFrameSize);
       
   945                     }
       
   946                 }
       
   947             }
       
   948         }
       
   949         buffer.flip();
       
   950         buffers.add(buffer);
       
   951         return buffers;
       
   952     }
       
   953 
       
   954     private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
       
   955         oh.streamid(stream.streamid);
       
   956         if (Log.headers()) {
       
   957             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
       
   958             sb.append(stream.streamid).append(")\n");
       
   959             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
       
   960             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
       
   961             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
       
   962             Log.logHeaders(sb.toString());
       
   963         }
       
   964         List<HeaderFrame> frames = encodeHeaders(oh);
       
   965         return encodeFrames(frames);
       
   966     }
       
   967 
       
   968     private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
       
   969         if (Log.frames()) {
       
   970             frames.forEach(f -> Log.logFrames(f, "OUT"));
       
   971         }
       
   972         return framesEncoder.encodeFrames(frames);
       
   973     }
       
   974 
       
   975     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
       
   976         Stream<?> stream = oh.getAttachment();
       
   977         int streamid = nextstreamid;
       
   978         nextstreamid += 2;
       
   979         stream.registerStream(streamid);
       
   980         // set outgoing window here. This allows thread sending
       
   981         // body to proceed.
       
   982         windowController.registerStream(streamid, getInitialSendWindowSize());
       
   983         return stream;
       
   984     }
       
   985 
       
   986     private final Object sendlock = new Object();
       
   987 
       
   988     void sendFrame(Http2Frame frame) {
       
   989         try {
       
   990             HttpPublisher publisher = publisher();
       
   991             synchronized (sendlock) {
       
   992                 if (frame instanceof OutgoingHeaders) {
       
   993                     @SuppressWarnings("unchecked")
       
   994                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
       
   995                     Stream<?> stream = registerNewStream(oh);
       
   996                     // provide protection from inserting unordered frames between Headers and Continuation
       
   997                     publisher.enqueue(encodeHeaders(oh, stream));
       
   998                 } else {
       
   999                     publisher.enqueue(encodeFrame(frame));
       
  1000                 }
       
  1001             }
       
  1002             publisher.signalEnqueued();
       
  1003         } catch (IOException e) {
       
  1004             if (!closed) {
       
  1005                 Log.logError(e);
       
  1006                 shutdown(e);
       
  1007             }
       
  1008         }
       
  1009     }
       
  1010 
       
  1011     private List<ByteBuffer> encodeFrame(Http2Frame frame) {
       
  1012         Log.logFrames(frame, "OUT");
       
  1013         return framesEncoder.encodeFrame(frame);
       
  1014     }
       
  1015 
       
  1016     void sendDataFrame(DataFrame frame) {
       
  1017         try {
       
  1018             HttpPublisher publisher = publisher();
       
  1019             publisher.enqueue(encodeFrame(frame));
       
  1020             publisher.signalEnqueued();
       
  1021         } catch (IOException e) {
       
  1022             if (!closed) {
       
  1023                 Log.logError(e);
       
  1024                 shutdown(e);
       
  1025             }
       
  1026         }
       
  1027     }
       
  1028 
       
  1029     /*
       
  1030      * Direct call of the method bypasses synchronization on "sendlock" and
       
  1031      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
       
  1032      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
       
  1033      */
       
  1034     void sendUnorderedFrame(Http2Frame frame) {
       
  1035         try {
       
  1036             HttpPublisher publisher = publisher();
       
  1037             publisher.enqueueUnordered(encodeFrame(frame));
       
  1038             publisher.signalEnqueued();
       
  1039         } catch (IOException e) {
       
  1040             if (!closed) {
       
  1041                 Log.logError(e);
       
  1042                 shutdown(e);
       
  1043             }
       
  1044         }
       
  1045     }
       
  1046 
       
  1047     /**
       
  1048      * A simple tube subscriber for reading from the connection flow.
       
  1049      */
       
  1050     final class Http2TubeSubscriber implements TubeSubscriber {
       
  1051         volatile Flow.Subscription subscription;
       
  1052         volatile boolean completed;
       
  1053         volatile boolean dropped;
       
  1054         volatile Throwable error;
       
  1055         final ConcurrentLinkedQueue<ByteBuffer> queue
       
  1056                 = new ConcurrentLinkedQueue<>();
       
  1057         final SequentialScheduler scheduler =
       
  1058                 SequentialScheduler.synchronizedScheduler(this::processQueue);
       
  1059 
       
  1060         final void processQueue() {
       
  1061             try {
       
  1062                 while (!queue.isEmpty() && !scheduler.isStopped()) {
       
  1063                     ByteBuffer buffer = queue.poll();
       
  1064                     debug.log(Level.DEBUG,
       
  1065                               "sending %d to Http2Connection.asyncReceive",
       
  1066                               buffer.remaining());
       
  1067                     asyncReceive(buffer);
       
  1068                 }
       
  1069             } catch (Throwable t) {
       
  1070                 Throwable x = error;
       
  1071                 if (x == null) error = t;
       
  1072             } finally {
       
  1073                 Throwable x = error;
       
  1074                 if (x != null) {
       
  1075                     debug.log(Level.DEBUG, "Stopping scheduler", x);
       
  1076                     scheduler.stop();
       
  1077                     Http2Connection.this.shutdown(x);
       
  1078                 }
       
  1079             }
       
  1080         }
       
  1081 
       
  1082         @Override
       
  1083         public void onSubscribe(Flow.Subscription subscription) {
       
  1084             // supports being called multiple time.
       
  1085             // doesn't cancel the previous subscription, since that is
       
  1086             // most probably the same as the new subscription.
       
  1087             assert this.subscription == null || dropped == false;
       
  1088             this.subscription = subscription;
       
  1089             dropped = false;
       
  1090             // TODO FIXME: request(1) should be done by the delegate.
       
  1091             if (!completed) {
       
  1092                 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
       
  1093                 subscription.request(Long.MAX_VALUE);
       
  1094             } else {
       
  1095                 debug.log(Level.DEBUG, "onSubscribe: already completed");
       
  1096             }
       
  1097         }
       
  1098 
       
  1099         @Override
       
  1100         public void onNext(List<ByteBuffer> item) {
       
  1101             debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
       
  1102                     + " bytes in " + item.size() + " buffers");
       
  1103             queue.addAll(item);
       
  1104             scheduler.runOrSchedule(client().theExecutor());
       
  1105         }
       
  1106 
       
  1107         @Override
       
  1108         public void onError(Throwable throwable) {
       
  1109             debug.log(Level.DEBUG, () -> "onError: " + throwable);
       
  1110             error = throwable;
       
  1111             completed = true;
       
  1112             scheduler.runOrSchedule(client().theExecutor());
       
  1113         }
       
  1114 
       
  1115         @Override
       
  1116         public void onComplete() {
       
  1117             debug.log(Level.DEBUG, "EOF");
       
  1118             error = new EOFException("EOF reached while reading");
       
  1119             completed = true;
       
  1120             scheduler.runOrSchedule(client().theExecutor());
       
  1121         }
       
  1122 
       
  1123         @Override
       
  1124         public void dropSubscription() {
       
  1125             debug.log(Level.DEBUG, "dropSubscription");
       
  1126             // we could probably set subscription to null here...
       
  1127             // then we might not need the 'dropped' boolean?
       
  1128             dropped = true;
       
  1129         }
       
  1130     }
       
  1131 
       
  1132     @Override
       
  1133     public final String toString() {
       
  1134         return dbgString();
       
  1135     }
       
  1136 
       
  1137     final String dbgString() {
       
  1138         return "Http2Connection("
       
  1139                     + connection.getConnectionFlow() + ")";
       
  1140     }
       
  1141 
       
  1142     final class LoggingHeaderDecoder extends HeaderDecoder {
       
  1143 
       
  1144         private final HeaderDecoder delegate;
       
  1145         private final System.Logger debugHpack =
       
  1146                 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
       
  1147 
       
  1148         LoggingHeaderDecoder(HeaderDecoder delegate) {
       
  1149             this.delegate = delegate;
       
  1150         }
       
  1151 
       
  1152         String dbgString() {
       
  1153             return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
       
  1154         }
       
  1155 
       
  1156         @Override
       
  1157         public void onDecoded(CharSequence name, CharSequence value) {
       
  1158             delegate.onDecoded(name, value);
       
  1159         }
       
  1160 
       
  1161         @Override
       
  1162         public void onIndexed(int index,
       
  1163                               CharSequence name,
       
  1164                               CharSequence value) {
       
  1165             debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
       
  1166                            index, name, value);
       
  1167             delegate.onIndexed(index, name, value);
       
  1168         }
       
  1169 
       
  1170         @Override
       
  1171         public void onLiteral(int index,
       
  1172                               CharSequence name,
       
  1173                               CharSequence value,
       
  1174                               boolean valueHuffman) {
       
  1175             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
       
  1176                               index, name, value, valueHuffman);
       
  1177             delegate.onLiteral(index, name, value, valueHuffman);
       
  1178         }
       
  1179 
       
  1180         @Override
       
  1181         public void onLiteral(CharSequence name,
       
  1182                               boolean nameHuffman,
       
  1183                               CharSequence value,
       
  1184                               boolean valueHuffman) {
       
  1185             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
       
  1186                            name, nameHuffman, value, valueHuffman);
       
  1187             delegate.onLiteral(name, nameHuffman, value, valueHuffman);
       
  1188         }
       
  1189 
       
  1190         @Override
       
  1191         public void onLiteralNeverIndexed(int index,
       
  1192                                           CharSequence name,
       
  1193                                           CharSequence value,
       
  1194                                           boolean valueHuffman) {
       
  1195             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
       
  1196                            index, name, value, valueHuffman);
       
  1197             delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
       
  1198         }
       
  1199 
       
  1200         @Override
       
  1201         public void onLiteralNeverIndexed(CharSequence name,
       
  1202                                           boolean nameHuffman,
       
  1203                                           CharSequence value,
       
  1204                                           boolean valueHuffman) {
       
  1205             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
       
  1206                            name, nameHuffman, value, valueHuffman);
       
  1207             delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
       
  1208         }
       
  1209 
       
  1210         @Override
       
  1211         public void onLiteralWithIndexing(int index,
       
  1212                                           CharSequence name,
       
  1213                                           CharSequence value,
       
  1214                                           boolean valueHuffman) {
       
  1215             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
       
  1216                            index, name, value, valueHuffman);
       
  1217             delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
       
  1218         }
       
  1219 
       
  1220         @Override
       
  1221         public void onLiteralWithIndexing(CharSequence name,
       
  1222                                           boolean nameHuffman,
       
  1223                                           CharSequence value,
       
  1224                                           boolean valueHuffman) {
       
  1225             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
       
  1226                               name, nameHuffman, value, valueHuffman);
       
  1227             delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
       
  1228         }
       
  1229 
       
  1230         @Override
       
  1231         public void onSizeUpdate(int capacity) {
       
  1232             debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
       
  1233             delegate.onSizeUpdate(capacity);
       
  1234         }
       
  1235 
       
  1236         @Override
       
  1237         HttpHeadersImpl headers() {
       
  1238             return delegate.headers();
       
  1239         }
       
  1240     }
       
  1241 
       
  1242     static class HeaderDecoder implements DecodingCallback {
       
  1243         HttpHeadersImpl headers;
       
  1244 
       
  1245         HeaderDecoder() {
       
  1246             this.headers = new HttpHeadersImpl();
       
  1247         }
       
  1248 
       
  1249         @Override
       
  1250         public void onDecoded(CharSequence name, CharSequence value) {
       
  1251             headers.addHeader(name.toString(), value.toString());
       
  1252         }
       
  1253 
       
  1254         HttpHeadersImpl headers() {
       
  1255             return headers;
       
  1256         }
       
  1257     }
       
  1258 
       
  1259     static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
       
  1260 
       
  1261         final int initialWindowSize;
       
  1262         public ConnectionWindowUpdateSender(Http2Connection connection,
       
  1263                                             int initialWindowSize) {
       
  1264             super(connection, initialWindowSize);
       
  1265             this.initialWindowSize = initialWindowSize;
       
  1266         }
       
  1267 
       
  1268         @Override
       
  1269         int getStreamId() {
       
  1270             return 0;
       
  1271         }
       
  1272     }
       
  1273 
       
  1274     /**
       
  1275      * Thrown when https handshake negotiates http/1.1 alpn instead of h2
       
  1276      */
       
  1277     static final class ALPNException extends IOException {
       
  1278         private static final long serialVersionUID = 0L;
       
  1279         final transient AbstractAsyncSSLConnection connection;
       
  1280 
       
  1281         ALPNException(String msg, AbstractAsyncSSLConnection connection) {
       
  1282             super(msg);
       
  1283             this.connection = connection;
       
  1284         }
       
  1285 
       
  1286         AbstractAsyncSSLConnection getConnection() {
       
  1287             return connection;
       
  1288         }
       
  1289     }
       
  1290 }