src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56078 6c11b48a0695
child 56080 64846522c0d5
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.io.IOException;
       
    30 import java.lang.System.Logger.Level;
       
    31 import java.net.InetSocketAddress;
       
    32 import java.net.URI;
       
    33 import java.nio.ByteBuffer;
       
    34 import java.nio.charset.StandardCharsets;
       
    35 import java.util.Iterator;
       
    36 import java.util.LinkedList;
       
    37 import java.util.List;
       
    38 import java.util.Map;
       
    39 import java.util.concurrent.CompletableFuture;
       
    40 import java.util.ArrayList;
       
    41 import java.util.Objects;
       
    42 import java.util.concurrent.ConcurrentHashMap;
       
    43 import java.util.concurrent.ConcurrentLinkedQueue;
       
    44 import java.util.concurrent.Flow;
       
    45 import java.util.function.Function;
       
    46 import java.util.function.Supplier;
       
    47 import javax.net.ssl.SSLEngine;
       
    48 import javax.net.ssl.SSLException;
       
    49 import jdk.incubator.http.HttpConnection.HttpPublisher;
       
    50 import jdk.incubator.http.internal.common.FlowTube;
       
    51 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
       
    52 import jdk.incubator.http.internal.common.HttpHeadersImpl;
       
    53 import jdk.incubator.http.internal.common.Log;
       
    54 import jdk.incubator.http.internal.common.MinimalFuture;
       
    55 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    56 import jdk.incubator.http.internal.common.Utils;
       
    57 import jdk.incubator.http.internal.frame.ContinuationFrame;
       
    58 import jdk.incubator.http.internal.frame.DataFrame;
       
    59 import jdk.incubator.http.internal.frame.ErrorFrame;
       
    60 import jdk.incubator.http.internal.frame.FramesDecoder;
       
    61 import jdk.incubator.http.internal.frame.FramesEncoder;
       
    62 import jdk.incubator.http.internal.frame.GoAwayFrame;
       
    63 import jdk.incubator.http.internal.frame.HeaderFrame;
       
    64 import jdk.incubator.http.internal.frame.HeadersFrame;
       
    65 import jdk.incubator.http.internal.frame.Http2Frame;
       
    66 import jdk.incubator.http.internal.frame.MalformedFrame;
       
    67 import jdk.incubator.http.internal.frame.OutgoingHeaders;
       
    68 import jdk.incubator.http.internal.frame.PingFrame;
       
    69 import jdk.incubator.http.internal.frame.PushPromiseFrame;
       
    70 import jdk.incubator.http.internal.frame.ResetFrame;
       
    71 import jdk.incubator.http.internal.frame.SettingsFrame;
       
    72 import jdk.incubator.http.internal.frame.WindowUpdateFrame;
       
    73 import jdk.incubator.http.internal.hpack.Encoder;
       
    74 import jdk.incubator.http.internal.hpack.Decoder;
       
    75 import jdk.incubator.http.internal.hpack.DecodingCallback;
       
    76 import static java.nio.charset.StandardCharsets.UTF_8;
       
    77 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
       
    78 
       
    79 
       
    80 /**
       
    81  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
       
    82  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
       
    83  *
       
    84  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
       
    85  * to a HttpClientImpl.
       
    86  *
       
    87  * Creation cases:
       
    88  * 1) upgraded HTTP/1.1 plain tcp connection
       
    89  * 2) prior knowledge directly created plain tcp connection
       
    90  * 3) directly created HTTP/2 SSL connection which uses ALPN.
       
    91  *
       
    92  * Sending is done by writing directly to underlying HttpConnection object which
       
    93  * is operating in async mode. No flow control applies on output at this level
       
    94  * and all writes are just executed as puts to an output Q belonging to HttpConnection
       
    95  * Flow control is implemented by HTTP/2 protocol itself.
       
    96  *
       
    97  * Hpack header compression
       
    98  * and outgoing stream creation is also done here, because these operations
       
    99  * must be synchronized at the socket level. Stream objects send frames simply
       
   100  * by placing them on the connection's output Queue. sendFrame() is called
       
   101  * from a higher level (Stream) thread.
       
   102  *
       
   103  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
       
   104  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
       
   105  * or handles them directly itself. This thread performs hpack decompression
       
   106  * and incoming stream creation (Server push). Incoming frames destined for a
       
   107  * stream are provided by calling Stream.incoming().
       
   108  */
       
   109 class Http2Connection  {
       
   110 
       
   111     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
   112     static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
       
   113     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   114     final static System.Logger  DEBUG_LOGGER =
       
   115             Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
       
   116     private final System.Logger debugHpack =
       
   117                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
       
   118     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
       
   119 
       
   120     private boolean singleStream; // used only for stream 1, then closed
       
   121 
       
   122     /*
       
   123      *  ByteBuffer pooling strategy for HTTP/2 protocol:
       
   124      *
       
   125      * In general there are 4 points where ByteBuffers are used:
       
   126      *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
       
   127      *    in case of SSL connection.
       
   128      *
       
   129      * 1. Outgoing frames encoded to ByteBuffers.
       
   130      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
       
   131      *    At this place no pools at all. All outgoing buffers should be collected by GC.
       
   132      *
       
   133      * 2. Incoming ByteBuffers (decoded to frames).
       
   134      *    Here, total elimination of BB pool is not a good idea.
       
   135      *    We don't know how many bytes we will receive through network.
       
   136      * So here we allocate buffer of reasonable size. The following life of the BB:
       
   137      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
       
   138      *     BB is returned to pool,
       
   139      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
       
   140      *     Such BB is never returned to pool and will be GCed.
       
   141      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
       
   142      *     the buffer could be release to pool.
       
   143      *
       
   144      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
       
   145      *    because of we can't predict size encrypted packets.
       
   146      *
       
   147      */
       
   148 
       
   149 
       
   150     // A small class that allows to control frames with respect to the state of
       
   151     // the connection preface. Any data received before the connection
       
   152     // preface is sent will be buffered.
       
   153     private final class FramesController {
       
   154         volatile boolean prefaceSent;
       
   155         volatile List<ByteBuffer> pending;
       
   156 
       
   157         boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
       
   158                 throws IOException
       
   159         {
       
   160             // if preface is not sent, buffers data in the pending list
       
   161             if (!prefaceSent) {
       
   162                 debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
       
   163                           buf.remaining());
       
   164                 synchronized (this) {
       
   165                     if (!prefaceSent) {
       
   166                         if (pending == null) pending = new ArrayList<>();
       
   167                         pending.add(buf);
       
   168                         debug.log(Level.DEBUG, () -> "there are now "
       
   169                               + Utils.remaining(pending)
       
   170                               + " bytes buffered waiting for preface to be sent");
       
   171                         return false;
       
   172                     }
       
   173                 }
       
   174             }
       
   175 
       
   176             // Preface is sent. Checks for pending data and flush it.
       
   177             // We rely on this method being called from within the Http2TubeSubscriber
       
   178             // scheduler, so we know that no other thread could execute this method
       
   179             // concurrently while we're here.
       
   180             // This ensures that later incoming buffers will not
       
   181             // be processed before we have flushed the pending queue.
       
   182             // No additional synchronization is therefore necessary here.
       
   183             List<ByteBuffer> pending = this.pending;
       
   184             this.pending = null;
       
   185             if (pending != null) {
       
   186                 // flush pending data
       
   187                 debug.log(Level.DEBUG, () -> "Processing buffered data: "
       
   188                       + Utils.remaining(pending));
       
   189                 for (ByteBuffer b : pending) {
       
   190                     decoder.decode(b);
       
   191                 }
       
   192             }
       
   193             // push the received buffer to the frames decoder.
       
   194             if (buf != EMPTY_TRIGGER) {
       
   195                 debug.log(Level.DEBUG, "Processing %d", buf.remaining());
       
   196                 decoder.decode(buf);
       
   197             }
       
   198             return true;
       
   199         }
       
   200 
       
   201         // Mark that the connection preface is sent
       
   202         void markPrefaceSent() {
       
   203             assert !prefaceSent;
       
   204             synchronized (this) {
       
   205                 prefaceSent = true;
       
   206             }
       
   207         }
       
   208     }
       
   209 
       
   210     volatile boolean closed;
       
   211 
       
   212     //-------------------------------------
       
   213     final HttpConnection connection;
       
   214     private final Http2ClientImpl client2;
       
   215     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
       
   216     private int nextstreamid;
       
   217     private int nextPushStream = 2;
       
   218     private final Encoder hpackOut;
       
   219     private final Decoder hpackIn;
       
   220     final SettingsFrame clientSettings;
       
   221     private volatile SettingsFrame serverSettings;
       
   222     private final String key; // for HttpClientImpl.connections map
       
   223     private final FramesDecoder framesDecoder;
       
   224     private final FramesEncoder framesEncoder = new FramesEncoder();
       
   225 
       
   226     /**
       
   227      * Send Window controller for both connection and stream windows.
       
   228      * Each of this connection's Streams MUST use this controller.
       
   229      */
       
   230     private final WindowController windowController = new WindowController();
       
   231     private final FramesController framesController = new FramesController();
       
   232     private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
       
   233     final ConnectionWindowUpdateSender windowUpdater;
       
   234     private volatile Throwable cause;
       
   235     private volatile Supplier<ByteBuffer> initial;
       
   236 
       
   237     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
       
   238 
       
   239 
       
   240     // TODO: need list of control frames from other threads
       
   241     // that need to be sent
       
   242 
       
   243     private Http2Connection(HttpConnection connection,
       
   244                             Http2ClientImpl client2,
       
   245                             int nextstreamid,
       
   246                             String key) {
       
   247         this.connection = connection;
       
   248         this.client2 = client2;
       
   249         this.nextstreamid = nextstreamid;
       
   250         this.key = key;
       
   251         this.clientSettings = this.client2.getClientSettings();
       
   252         this.framesDecoder = new FramesDecoder(this::processFrame,
       
   253                 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
       
   254         // serverSettings will be updated by server
       
   255         this.serverSettings = SettingsFrame.getDefaultSettings();
       
   256         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
       
   257         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
       
   258         debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
       
   259         debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
       
   260         debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
       
   261         this.windowUpdater = new ConnectionWindowUpdateSender(this,
       
   262                 client2.getConnectionWindowSize(clientSettings));
       
   263     }
       
   264 
       
   265     /**
       
   266      * Case 1) Create from upgraded HTTP/1.1 connection.
       
   267      * Is ready to use. Can be SSL. exchange is the Exchange
       
   268      * that initiated the connection, whose response will be delivered
       
   269      * on a Stream.
       
   270      */
       
   271     private Http2Connection(HttpConnection connection,
       
   272                     Http2ClientImpl client2,
       
   273                     Exchange<?> exchange,
       
   274                     Supplier<ByteBuffer> initial)
       
   275         throws IOException, InterruptedException
       
   276     {
       
   277         this(connection,
       
   278                 client2,
       
   279                 3, // stream 1 is registered during the upgrade
       
   280                 keyFor(connection));
       
   281         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
       
   282 
       
   283         Stream<?> initialStream = createStream(exchange);
       
   284         initialStream.registerStream(1);
       
   285         windowController.registerStream(1, getInitialSendWindowSize());
       
   286         initialStream.requestSent();
       
   287         // Upgrading:
       
   288         //    set callbacks before sending preface - makes sure anything that
       
   289         //    might be sent by the server will come our way.
       
   290         this.initial = initial;
       
   291         connectFlows(connection);
       
   292         sendConnectionPreface();
       
   293     }
       
   294 
       
   295     // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
       
   296     // agreement from the server. Async style but completes immediately, because
       
   297     // the connection is already connected.
       
   298     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
       
   299                                                           Http2ClientImpl client2,
       
   300                                                           Exchange<?> exchange,
       
   301                                                           Supplier<ByteBuffer> initial)
       
   302     {
       
   303         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
       
   304     }
       
   305 
       
   306     // Requires TLS handshake. So, is really async
       
   307     static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
       
   308                                                           Http2ClientImpl h2client) {
       
   309         assert request.secure();
       
   310         AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
       
   311         HttpConnection.getConnection(request.getAddress(),
       
   312                                      h2client.client(),
       
   313                                      request,
       
   314                                      HttpClient.Version.HTTP_2);
       
   315 
       
   316         return connection.connectAsync()
       
   317                   .thenCompose(unused -> checkSSLConfig(connection))
       
   318                   .thenCompose(notused-> {
       
   319                       CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
       
   320                       try {
       
   321                           Http2Connection hc = new Http2Connection(request, h2client, connection);
       
   322                           cf.complete(hc);
       
   323                       } catch (IOException e) {
       
   324                           cf.completeExceptionally(e);
       
   325                       }
       
   326                       return cf; } );
       
   327     }
       
   328 
       
   329     /**
       
   330      * Cases 2) 3)
       
   331      *
       
   332      * request is request to be sent.
       
   333      */
       
   334     private Http2Connection(HttpRequestImpl request,
       
   335                             Http2ClientImpl h2client,
       
   336                             HttpConnection connection)
       
   337         throws IOException
       
   338     {
       
   339         this(connection,
       
   340              h2client,
       
   341              1,
       
   342              keyFor(request.uri(), request.proxy()));
       
   343 
       
   344         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
       
   345 
       
   346         // safe to resume async reading now.
       
   347         connectFlows(connection);
       
   348         sendConnectionPreface();
       
   349     }
       
   350 
       
   351     private void connectFlows(HttpConnection connection) {
       
   352         FlowTube tube =  connection.getConnectionFlow();
       
   353         // Connect the flow to our Http2TubeSubscriber:
       
   354         tube.connectFlows(connection.publisher(), subscriber);
       
   355     }
       
   356 
       
   357     final HttpClientImpl client() {
       
   358         return client2.client();
       
   359     }
       
   360 
       
   361     /**
       
   362      * Throws an IOException if h2 was not negotiated
       
   363      */
       
   364     private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
       
   365         assert aconn.isSecure();
       
   366 
       
   367         Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
       
   368             CompletableFuture<Void> cf = new MinimalFuture<>();
       
   369             SSLEngine engine = aconn.getEngine();
       
   370             assert Objects.equals(alpn, engine.getApplicationProtocol());
       
   371 
       
   372             DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
       
   373 
       
   374             if (alpn == null || !alpn.equals("h2")) {
       
   375                 String msg;
       
   376                 if (alpn == null) {
       
   377                     Log.logSSL("ALPN not supported");
       
   378                     msg = "ALPN not supported";
       
   379                 } else {
       
   380                     switch (alpn) {
       
   381                         case "":
       
   382                             Log.logSSL(msg = "No ALPN negotiated");
       
   383                             break;
       
   384                         case "http/1.1":
       
   385                             Log.logSSL( msg = "HTTP/1.1 ALPN returned");
       
   386                             break;
       
   387                         default:
       
   388                             Log.logSSL(msg = "Unexpected ALPN: " + alpn);
       
   389                             cf.completeExceptionally(new IOException(msg));
       
   390                     }
       
   391                 }
       
   392                 cf.completeExceptionally(new ALPNException(msg, aconn));
       
   393                 return cf;
       
   394             }
       
   395             cf.complete(null);
       
   396             return cf;
       
   397         };
       
   398 
       
   399         return aconn.getALPN()
       
   400                 .whenComplete((r,t) -> {
       
   401                     if (t != null && t instanceof SSLException) {
       
   402                         // something went wrong during the initial handshake
       
   403                         // close the connection
       
   404                         aconn.close();
       
   405                     }
       
   406                 })
       
   407                 .thenCompose(checkAlpnCF);
       
   408     }
       
   409 
       
   410     synchronized boolean singleStream() {
       
   411         return singleStream;
       
   412     }
       
   413 
       
   414     synchronized void setSingleStream(boolean use) {
       
   415         singleStream = use;
       
   416     }
       
   417 
       
   418     static String keyFor(HttpConnection connection) {
       
   419         boolean isProxy = connection.isProxied();
       
   420         boolean isSecure = connection.isSecure();
       
   421         InetSocketAddress addr = connection.address();
       
   422 
       
   423         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
       
   424     }
       
   425 
       
   426     static String keyFor(URI uri, InetSocketAddress proxy) {
       
   427         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
       
   428         boolean isProxy = proxy != null;
       
   429 
       
   430         String host;
       
   431         int port;
       
   432 
       
   433         if (proxy != null) {
       
   434             host = proxy.getHostString();
       
   435             port = proxy.getPort();
       
   436         } else {
       
   437             host = uri.getHost();
       
   438             port = uri.getPort();
       
   439         }
       
   440         return keyString(isSecure, isProxy, host, port);
       
   441     }
       
   442 
       
   443     // {C,S}:{H:P}:host:port
       
   444     // C indicates clear text connection "http"
       
   445     // S indicates secure "https"
       
   446     // H indicates host (direct) connection
       
   447     // P indicates proxy
       
   448     // Eg: "S:H:foo.com:80"
       
   449     static String keyString(boolean secure, boolean proxy, String host, int port) {
       
   450         if (secure && port == -1)
       
   451             port = 443;
       
   452         else if (!secure && port == -1)
       
   453             port = 80;
       
   454         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
       
   455     }
       
   456 
       
   457     String key() {
       
   458         return this.key;
       
   459     }
       
   460 
       
   461     boolean offerConnection() {
       
   462         return client2.offerConnection(this);
       
   463     }
       
   464 
       
   465     private HttpPublisher publisher() {
       
   466         return connection.publisher();
       
   467     }
       
   468 
       
   469     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
       
   470             throws IOException
       
   471     {
       
   472         debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
       
   473 
       
   474         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
       
   475 
       
   476         List<ByteBuffer> buffers = frame.getHeaderBlock();
       
   477         int len = buffers.size();
       
   478         for (int i = 0; i < len; i++) {
       
   479             ByteBuffer b = buffers.get(i);
       
   480             hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
       
   481         }
       
   482     }
       
   483 
       
   484     final int getInitialSendWindowSize() {
       
   485         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
       
   486     }
       
   487 
       
   488     void close() {
       
   489         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
       
   490         GoAwayFrame f = new GoAwayFrame(0,
       
   491                                         ErrorFrame.NO_ERROR,
       
   492                                         "Requested by user".getBytes(UTF_8));
       
   493         // TODO: set last stream. For now zero ok.
       
   494         sendFrame(f);
       
   495     }
       
   496 
       
   497     long count;
       
   498     final void asyncReceive(ByteBuffer buffer) {
       
   499         // We don't need to read anything and
       
   500         // we don't want to send anything back to the server
       
   501         // until the connection preface has been sent.
       
   502         // Therefore we're going to wait if needed before reading
       
   503         // (and thus replying) to anything.
       
   504         // Starting to reply to something (e.g send an ACK to a
       
   505         // SettingsFrame sent by the server) before the connection
       
   506         // preface is fully sent might result in the server
       
   507         // sending a GOAWAY frame with 'invalid_preface'.
       
   508         //
       
   509         // Note: asyncReceive is only called from the Http2TubeSubscriber
       
   510         //       sequential scheduler.
       
   511         try {
       
   512             Supplier<ByteBuffer> bs = initial;
       
   513             // ensure that we always handle the initial buffer first,
       
   514             // if any.
       
   515             if (bs != null) {
       
   516                 initial = null;
       
   517                 ByteBuffer b = bs.get();
       
   518                 if (b.hasRemaining()) {
       
   519                     long c = ++count;
       
   520                     debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
       
   521                         + c +"): " + b.remaining());
       
   522                     framesController.processReceivedData(framesDecoder, b);
       
   523                 }
       
   524             }
       
   525             ByteBuffer b = buffer;
       
   526             // the Http2TubeSubscriber scheduler ensures that the order of incoming
       
   527             // buffers is preserved.
       
   528             if (b == EMPTY_TRIGGER) {
       
   529                 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
       
   530                 boolean prefaceSent = framesController.prefaceSent;
       
   531                 assert prefaceSent;
       
   532                 // call framesController.processReceivedData to potentially
       
   533                 // trigger the processing of all the data buffered there.
       
   534                 framesController.processReceivedData(framesDecoder, buffer);
       
   535                 debug.log(Level.DEBUG, "H2 processed buffered data");
       
   536             } else {
       
   537                 long c = ++count;
       
   538                 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
       
   539                 framesController.processReceivedData(framesDecoder, buffer);
       
   540                 debug.log(Level.DEBUG, "H2 processed(%d)", c);
       
   541             }
       
   542         } catch (Throwable e) {
       
   543             String msg = Utils.stackTrace(e);
       
   544             Log.logTrace(msg);
       
   545             shutdown(e);
       
   546         }
       
   547     }
       
   548 
       
   549     Throwable getRecordedCause() {
       
   550         return cause;
       
   551     }
       
   552 
       
   553     void shutdown(Throwable t) {
       
   554         debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
       
   555         if (closed == true) return;
       
   556         synchronized (this) {
       
   557             if (closed == true) return;
       
   558             closed = true;
       
   559         }
       
   560         Log.logError(t);
       
   561         Throwable initialCause = this.cause;
       
   562         if (initialCause == null) this.cause = t;
       
   563         client2.deleteConnection(this);
       
   564         List<Stream<?>> c = new LinkedList<>(streams.values());
       
   565         for (Stream<?> s : c) {
       
   566             s.cancelImpl(t);
       
   567         }
       
   568         connection.close();
       
   569     }
       
   570 
       
   571     /**
       
   572      * Streams initiated by a client MUST use odd-numbered stream
       
   573      * identifiers; those initiated by the server MUST use even-numbered
       
   574      * stream identifiers.
       
   575      */
       
   576     private static final boolean isSeverInitiatedStream(int streamid) {
       
   577         return (streamid & 0x1) == 0;
       
   578     }
       
   579 
       
   580     /**
       
   581      * Handles stream 0 (common) frames that apply to whole connection and passes
       
   582      * other stream specific frames to that Stream object.
       
   583      *
       
   584      * Invokes Stream.incoming() which is expected to process frame without
       
   585      * blocking.
       
   586      */
       
   587     void processFrame(Http2Frame frame) throws IOException {
       
   588         Log.logFrames(frame, "IN");
       
   589         int streamid = frame.streamid();
       
   590         if (frame instanceof MalformedFrame) {
       
   591             Log.logError(((MalformedFrame) frame).getMessage());
       
   592             if (streamid == 0) {
       
   593                 framesDecoder.close("Malformed frame on stream 0");
       
   594                 protocolError(((MalformedFrame) frame).getErrorCode(),
       
   595                         ((MalformedFrame) frame).getMessage());
       
   596             } else {
       
   597                 debug.log(Level.DEBUG, () -> "Reset stream: "
       
   598                           + ((MalformedFrame) frame).getMessage());
       
   599                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
       
   600             }
       
   601             return;
       
   602         }
       
   603         if (streamid == 0) {
       
   604             handleConnectionFrame(frame);
       
   605         } else {
       
   606             if (frame instanceof SettingsFrame) {
       
   607                 // The stream identifier for a SETTINGS frame MUST be zero
       
   608                 framesDecoder.close(
       
   609                         "The stream identifier for a SETTINGS frame MUST be zero");
       
   610                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
       
   611                 return;
       
   612             }
       
   613 
       
   614             Stream<?> stream = getStream(streamid);
       
   615             if (stream == null) {
       
   616                 // Should never receive a frame with unknown stream id
       
   617 
       
   618                 if (frame instanceof HeaderFrame) {
       
   619                     // always decode the headers as they may affect
       
   620                     // connection-level HPACK decoding state
       
   621                     HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
       
   622                     decodeHeaders((HeaderFrame) frame, decoder);
       
   623                 }
       
   624 
       
   625                 if (!(frame instanceof ResetFrame)) {
       
   626                     if (isSeverInitiatedStream(streamid)) {
       
   627                         if (streamid < nextPushStream) {
       
   628                             // trailing data on a cancelled push promise stream,
       
   629                             // reset will already have been sent, ignore
       
   630                             Log.logTrace("Ignoring cancelled push promise frame " + frame);
       
   631                         } else {
       
   632                             resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   633                         }
       
   634                     } else if (streamid >= nextstreamid) {
       
   635                         // otherwise the stream has already been reset/closed
       
   636                         resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   637                     }
       
   638                 }
       
   639                 return;
       
   640             }
       
   641             if (frame instanceof PushPromiseFrame) {
       
   642                 PushPromiseFrame pp = (PushPromiseFrame)frame;
       
   643                 handlePushPromise(stream, pp);
       
   644             } else if (frame instanceof HeaderFrame) {
       
   645                 // decode headers (or continuation)
       
   646                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
       
   647                 stream.incoming(frame);
       
   648             } else {
       
   649                 stream.incoming(frame);
       
   650             }
       
   651         }
       
   652     }
       
   653 
       
   654     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
       
   655         throws IOException
       
   656     {
       
   657         // always decode the headers as they may affect connection-level HPACK
       
   658         // decoding state
       
   659         HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
       
   660         decodeHeaders(pp, decoder);
       
   661 
       
   662         HttpRequestImpl parentReq = parent.request;
       
   663         int promisedStreamid = pp.getPromisedStream();
       
   664         if (promisedStreamid != nextPushStream) {
       
   665             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
       
   666             return;
       
   667         } else {
       
   668             nextPushStream += 2;
       
   669         }
       
   670 
       
   671         HttpHeadersImpl headers = decoder.headers();
       
   672         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
       
   673         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
       
   674         Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
       
   675         pushExch.exchImpl = pushStream;
       
   676         pushStream.registerStream(promisedStreamid);
       
   677         parent.incoming_pushPromise(pushReq, pushStream);
       
   678     }
       
   679 
       
   680     private void handleConnectionFrame(Http2Frame frame)
       
   681         throws IOException
       
   682     {
       
   683         switch (frame.type()) {
       
   684           case SettingsFrame.TYPE:
       
   685               handleSettings((SettingsFrame)frame);
       
   686               break;
       
   687           case PingFrame.TYPE:
       
   688               handlePing((PingFrame)frame);
       
   689               break;
       
   690           case GoAwayFrame.TYPE:
       
   691               handleGoAway((GoAwayFrame)frame);
       
   692               break;
       
   693           case WindowUpdateFrame.TYPE:
       
   694               handleWindowUpdate((WindowUpdateFrame)frame);
       
   695               break;
       
   696           default:
       
   697             protocolError(ErrorFrame.PROTOCOL_ERROR);
       
   698         }
       
   699     }
       
   700 
       
   701     void resetStream(int streamid, int code) throws IOException {
       
   702         Log.logError(
       
   703             "Resetting stream {0,number,integer} with error code {1,number,integer}",
       
   704             streamid, code);
       
   705         ResetFrame frame = new ResetFrame(streamid, code);
       
   706         sendFrame(frame);
       
   707         closeStream(streamid);
       
   708     }
       
   709 
       
   710     void closeStream(int streamid) {
       
   711         debug.log(Level.DEBUG, "Closed stream %d", streamid);
       
   712         Stream<?> s = streams.remove(streamid);
       
   713         if (s != null) {
       
   714             // decrement the reference count on the HttpClientImpl
       
   715             // to allow the SelectorManager thread to exit if no
       
   716             // other operation is pending and the facade is no
       
   717             // longer referenced.
       
   718             client().unreference();
       
   719         }
       
   720         // ## Remove s != null. It is a hack for delayed cancellation,reset
       
   721         if (s != null && !(s instanceof Stream.PushedStream)) {
       
   722             // Since PushStreams have no request body, then they have no
       
   723             // corresponding entry in the window controller.
       
   724             windowController.removeStream(streamid);
       
   725         }
       
   726         if (singleStream() && streams.isEmpty()) {
       
   727             // should be only 1 stream, but there might be more if server push
       
   728             close();
       
   729         }
       
   730     }
       
   731 
       
   732     /**
       
   733      * Increments this connection's send Window by the amount in the given frame.
       
   734      */
       
   735     private void handleWindowUpdate(WindowUpdateFrame f)
       
   736         throws IOException
       
   737     {
       
   738         int amount = f.getUpdate();
       
   739         if (amount <= 0) {
       
   740             // ## temporarily disable to workaround a bug in Jetty where it
       
   741             // ## sends Window updates with a 0 update value.
       
   742             //protocolError(ErrorFrame.PROTOCOL_ERROR);
       
   743         } else {
       
   744             boolean success = windowController.increaseConnectionWindow(amount);
       
   745             if (!success) {
       
   746                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
       
   747             }
       
   748         }
       
   749     }
       
   750 
       
   751     private void protocolError(int errorCode)
       
   752         throws IOException
       
   753     {
       
   754         protocolError(errorCode, null);
       
   755     }
       
   756 
       
   757     private void protocolError(int errorCode, String msg)
       
   758         throws IOException
       
   759     {
       
   760         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
       
   761         sendFrame(frame);
       
   762         shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
       
   763     }
       
   764 
       
   765     private void handleSettings(SettingsFrame frame)
       
   766         throws IOException
       
   767     {
       
   768         assert frame.streamid() == 0;
       
   769         if (!frame.getFlag(SettingsFrame.ACK)) {
       
   770             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
       
   771             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
       
   772             int diff = newWindowSize - oldWindowSize;
       
   773             if (diff != 0) {
       
   774                 windowController.adjustActiveStreams(diff);
       
   775             }
       
   776             serverSettings = frame;
       
   777             sendFrame(new SettingsFrame(SettingsFrame.ACK));
       
   778         }
       
   779     }
       
   780 
       
   781     private void handlePing(PingFrame frame)
       
   782         throws IOException
       
   783     {
       
   784         frame.setFlag(PingFrame.ACK);
       
   785         sendUnorderedFrame(frame);
       
   786     }
       
   787 
       
   788     private void handleGoAway(GoAwayFrame frame)
       
   789         throws IOException
       
   790     {
       
   791         shutdown(new IOException(
       
   792                         String.valueOf(connection.channel().getLocalAddress())
       
   793                         +": GOAWAY received"));
       
   794     }
       
   795 
       
   796     /**
       
   797      * Max frame size we are allowed to send
       
   798      */
       
   799     public int getMaxSendFrameSize() {
       
   800         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
       
   801         if (param == -1) {
       
   802             param = DEFAULT_FRAME_SIZE;
       
   803         }
       
   804         return param;
       
   805     }
       
   806 
       
   807     /**
       
   808      * Max frame size we will receive
       
   809      */
       
   810     public int getMaxReceiveFrameSize() {
       
   811         return clientSettings.getParameter(MAX_FRAME_SIZE);
       
   812     }
       
   813 
       
   814     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
       
   815 
       
   816     private static final byte[] PREFACE_BYTES =
       
   817         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
       
   818 
       
   819     /**
       
   820      * Sends Connection preface and Settings frame with current preferred
       
   821      * values
       
   822      */
       
   823     private void sendConnectionPreface() throws IOException {
       
   824         Log.logTrace("{0}: start sending connection preface to {1}",
       
   825                      connection.channel().getLocalAddress(),
       
   826                      connection.address());
       
   827         SettingsFrame sf = new SettingsFrame(clientSettings);
       
   828         int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
       
   829         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
       
   830         Log.logFrames(sf, "OUT");
       
   831         // send preface bytes and SettingsFrame together
       
   832         HttpPublisher publisher = publisher();
       
   833         publisher.enqueue(List.of(buf));
       
   834         publisher.signalEnqueued();
       
   835         // mark preface sent.
       
   836         framesController.markPrefaceSent();
       
   837         Log.logTrace("PREFACE_BYTES sent");
       
   838         Log.logTrace("Settings Frame sent");
       
   839 
       
   840         // send a Window update for the receive buffer we are using
       
   841         // minus the initial 64 K specified in protocol
       
   842         final int len = windowUpdater.initialWindowSize - initialWindowSize;
       
   843         if (len > 0) {
       
   844             windowUpdater.sendWindowUpdate(len);
       
   845         }
       
   846         // there will be an ACK to the windows update - which should
       
   847         // cause any pending data stored before the preface was sent to be
       
   848         // flushed (see PrefaceController).
       
   849         Log.logTrace("finished sending connection preface");
       
   850         debug.log(Level.DEBUG, "Triggering processing of buffered data"
       
   851                   + " after sending connection preface");
       
   852         subscriber.onNext(List.of(EMPTY_TRIGGER));
       
   853     }
       
   854 
       
   855     /**
       
   856      * Returns an existing Stream with given id, or null if doesn't exist
       
   857      */
       
   858     @SuppressWarnings("unchecked")
       
   859     <T> Stream<T> getStream(int streamid) {
       
   860         return (Stream<T>)streams.get(streamid);
       
   861     }
       
   862 
       
   863     /**
       
   864      * Creates Stream with given id.
       
   865      */
       
   866     final <T> Stream<T> createStream(Exchange<T> exchange) {
       
   867         Stream<T> stream = new Stream<>(this, exchange, windowController);
       
   868         return stream;
       
   869     }
       
   870 
       
   871     <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
       
   872         PushGroup<T> pg = parent.exchange.getPushGroup();
       
   873         return new Stream.PushedStream<>(pg, this, pushEx);
       
   874     }
       
   875 
       
   876     <T> void putStream(Stream<T> stream, int streamid) {
       
   877         // increment the reference count on the HttpClientImpl
       
   878         // to prevent the SelectorManager thread from exiting until
       
   879         // the stream is closed.
       
   880         client().reference();
       
   881         streams.put(streamid, stream);
       
   882     }
       
   883 
       
   884     /**
       
   885      * Encode the headers into a List<ByteBuffer> and then create HEADERS
       
   886      * and CONTINUATION frames from the list and return the List<Http2Frame>.
       
   887      */
       
   888     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
       
   889         List<ByteBuffer> buffers = encodeHeadersImpl(
       
   890                 getMaxSendFrameSize(),
       
   891                 frame.getAttachment().getRequestPseudoHeaders(),
       
   892                 frame.getUserHeaders(),
       
   893                 frame.getSystemHeaders());
       
   894 
       
   895         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
       
   896         Iterator<ByteBuffer> bufIterator = buffers.iterator();
       
   897         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
       
   898         frames.add(oframe);
       
   899         while(bufIterator.hasNext()) {
       
   900             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
       
   901             frames.add(oframe);
       
   902         }
       
   903         oframe.setFlag(HeaderFrame.END_HEADERS);
       
   904         return frames;
       
   905     }
       
   906 
       
   907     // Dedicated cache for headers encoding ByteBuffer.
       
   908     // There can be no concurrent access to this  buffer as all access to this buffer
       
   909     // and its content happen within a single critical code block section protected
       
   910     // by the sendLock. / (see sendFrame())
       
   911     // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
       
   912 
       
   913     private ByteBuffer getHeaderBuffer(int maxFrameSize) {
       
   914         ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
       
   915         buf.limit(maxFrameSize);
       
   916         return buf;
       
   917     }
       
   918 
       
   919     /*
       
   920      * Encodes all the headers from the given HttpHeaders into the given List
       
   921      * of buffers.
       
   922      *
       
   923      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
       
   924      *
       
   925      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
       
   926      *     characters that are compared in a case-insensitive fashion.  However,
       
   927      *     header field names MUST be converted to lowercase prior to their
       
   928      *     encoding in HTTP/2...
       
   929      */
       
   930     private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
       
   931         ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
       
   932         List<ByteBuffer> buffers = new ArrayList<>();
       
   933         for(HttpHeaders header : headers) {
       
   934             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
       
   935                 String lKey = e.getKey().toLowerCase();
       
   936                 List<String> values = e.getValue();
       
   937                 for (String value : values) {
       
   938                     hpackOut.header(lKey, value);
       
   939                     while (!hpackOut.encode(buffer)) {
       
   940                         buffer.flip();
       
   941                         buffers.add(buffer);
       
   942                         buffer =  getHeaderBuffer(maxFrameSize);
       
   943                     }
       
   944                 }
       
   945             }
       
   946         }
       
   947         buffer.flip();
       
   948         buffers.add(buffer);
       
   949         return buffers;
       
   950     }
       
   951 
       
   952     private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
       
   953         oh.streamid(stream.streamid);
       
   954         if (Log.headers()) {
       
   955             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
       
   956             sb.append(stream.streamid).append(")\n");
       
   957             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
       
   958             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
       
   959             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
       
   960             Log.logHeaders(sb.toString());
       
   961         }
       
   962         List<HeaderFrame> frames = encodeHeaders(oh);
       
   963         return encodeFrames(frames);
       
   964     }
       
   965 
       
   966     private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
       
   967         if (Log.frames()) {
       
   968             frames.forEach(f -> Log.logFrames(f, "OUT"));
       
   969         }
       
   970         return framesEncoder.encodeFrames(frames);
       
   971     }
       
   972 
       
   973     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
       
   974         Stream<?> stream = oh.getAttachment();
       
   975         int streamid = nextstreamid;
       
   976         nextstreamid += 2;
       
   977         stream.registerStream(streamid);
       
   978         // set outgoing window here. This allows thread sending
       
   979         // body to proceed.
       
   980         windowController.registerStream(streamid, getInitialSendWindowSize());
       
   981         return stream;
       
   982     }
       
   983 
       
   984     private final Object sendlock = new Object();
       
   985 
       
   986     void sendFrame(Http2Frame frame) {
       
   987         try {
       
   988             HttpPublisher publisher = publisher();
       
   989             synchronized (sendlock) {
       
   990                 if (frame instanceof OutgoingHeaders) {
       
   991                     @SuppressWarnings("unchecked")
       
   992                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
       
   993                     Stream<?> stream = registerNewStream(oh);
       
   994                     // provide protection from inserting unordered frames between Headers and Continuation
       
   995                     publisher.enqueue(encodeHeaders(oh, stream));
       
   996                 } else {
       
   997                     publisher.enqueue(encodeFrame(frame));
       
   998                 }
       
   999             }
       
  1000             publisher.signalEnqueued();
       
  1001         } catch (IOException e) {
       
  1002             if (!closed) {
       
  1003                 Log.logError(e);
       
  1004                 shutdown(e);
       
  1005             }
       
  1006         }
       
  1007     }
       
  1008 
       
  1009     private List<ByteBuffer> encodeFrame(Http2Frame frame) {
       
  1010         Log.logFrames(frame, "OUT");
       
  1011         return framesEncoder.encodeFrame(frame);
       
  1012     }
       
  1013 
       
  1014     void sendDataFrame(DataFrame frame) {
       
  1015         try {
       
  1016             HttpPublisher publisher = publisher();
       
  1017             publisher.enqueue(encodeFrame(frame));
       
  1018             publisher.signalEnqueued();
       
  1019         } catch (IOException e) {
       
  1020             if (!closed) {
       
  1021                 Log.logError(e);
       
  1022                 shutdown(e);
       
  1023             }
       
  1024         }
       
  1025     }
       
  1026 
       
  1027     /*
       
  1028      * Direct call of the method bypasses synchronization on "sendlock" and
       
  1029      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
       
  1030      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
       
  1031      */
       
  1032     void sendUnorderedFrame(Http2Frame frame) {
       
  1033         try {
       
  1034             HttpPublisher publisher = publisher();
       
  1035             publisher.enqueueUnordered(encodeFrame(frame));
       
  1036             publisher.signalEnqueued();
       
  1037         } catch (IOException e) {
       
  1038             if (!closed) {
       
  1039                 Log.logError(e);
       
  1040                 shutdown(e);
       
  1041             }
       
  1042         }
       
  1043     }
       
  1044 
       
  1045     /**
       
  1046      * A simple tube subscriber for reading from the connection flow.
       
  1047      */
       
  1048     final class Http2TubeSubscriber implements TubeSubscriber {
       
  1049         volatile Flow.Subscription subscription;
       
  1050         volatile boolean completed;
       
  1051         volatile boolean dropped;
       
  1052         volatile Throwable error;
       
  1053         final ConcurrentLinkedQueue<ByteBuffer> queue
       
  1054                 = new ConcurrentLinkedQueue<>();
       
  1055         final SequentialScheduler scheduler =
       
  1056                 SequentialScheduler.synchronizedScheduler(this::processQueue);
       
  1057 
       
  1058         final void processQueue() {
       
  1059             try {
       
  1060                 while (!queue.isEmpty() && !scheduler.isStopped()) {
       
  1061                     ByteBuffer buffer = queue.poll();
       
  1062                     debug.log(Level.DEBUG,
       
  1063                               "sending %d to Http2Connection.asyncReceive",
       
  1064                               buffer.remaining());
       
  1065                     asyncReceive(buffer);
       
  1066                 }
       
  1067             } catch (Throwable t) {
       
  1068                 Throwable x = error;
       
  1069                 if (x == null) error = t;
       
  1070             } finally {
       
  1071                 Throwable x = error;
       
  1072                 if (x != null) {
       
  1073                     debug.log(Level.DEBUG, "Stopping scheduler", x);
       
  1074                     scheduler.stop();
       
  1075                     Http2Connection.this.shutdown(x);
       
  1076                 }
       
  1077             }
       
  1078         }
       
  1079 
       
  1080         @Override
       
  1081         public void onSubscribe(Flow.Subscription subscription) {
       
  1082             // supports being called multiple time.
       
  1083             // doesn't cancel the previous subscription, since that is
       
  1084             // most probably the same as the new subscription.
       
  1085             assert this.subscription == null || dropped == false;
       
  1086             this.subscription = subscription;
       
  1087             dropped = false;
       
  1088             // TODO FIXME: request(1) should be done by the delegate.
       
  1089             if (!completed) {
       
  1090                 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
       
  1091                 subscription.request(Long.MAX_VALUE);
       
  1092             } else {
       
  1093                 debug.log(Level.DEBUG, "onSubscribe: already completed");
       
  1094             }
       
  1095         }
       
  1096 
       
  1097         @Override
       
  1098         public void onNext(List<ByteBuffer> item) {
       
  1099             debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
       
  1100                     + " bytes in " + item.size() + " buffers");
       
  1101             queue.addAll(item);
       
  1102             scheduler.runOrSchedule(client().theExecutor());
       
  1103         }
       
  1104 
       
  1105         @Override
       
  1106         public void onError(Throwable throwable) {
       
  1107             debug.log(Level.DEBUG, () -> "onError: " + throwable);
       
  1108             error = throwable;
       
  1109             completed = true;
       
  1110             scheduler.runOrSchedule(client().theExecutor());
       
  1111         }
       
  1112 
       
  1113         @Override
       
  1114         public void onComplete() {
       
  1115             debug.log(Level.DEBUG, "EOF");
       
  1116             error = new EOFException("EOF reached while reading");
       
  1117             completed = true;
       
  1118             scheduler.runOrSchedule(client().theExecutor());
       
  1119         }
       
  1120 
       
  1121         @Override
       
  1122         public void dropSubscription() {
       
  1123             debug.log(Level.DEBUG, "dropSubscription");
       
  1124             // we could probably set subscription to null here...
       
  1125             // then we might not need the 'dropped' boolean?
       
  1126             dropped = true;
       
  1127         }
       
  1128     }
       
  1129 
       
  1130     @Override
       
  1131     public final String toString() {
       
  1132         return dbgString();
       
  1133     }
       
  1134 
       
  1135     final String dbgString() {
       
  1136         return "Http2Connection("
       
  1137                     + connection.getConnectionFlow() + ")";
       
  1138     }
       
  1139 
       
  1140     final class LoggingHeaderDecoder extends HeaderDecoder {
       
  1141 
       
  1142         private final HeaderDecoder delegate;
       
  1143         private final System.Logger debugHpack =
       
  1144                 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
       
  1145 
       
  1146         LoggingHeaderDecoder(HeaderDecoder delegate) {
       
  1147             this.delegate = delegate;
       
  1148         }
       
  1149 
       
  1150         String dbgString() {
       
  1151             return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
       
  1152         }
       
  1153 
       
  1154         @Override
       
  1155         public void onDecoded(CharSequence name, CharSequence value) {
       
  1156             delegate.onDecoded(name, value);
       
  1157         }
       
  1158 
       
  1159         @Override
       
  1160         public void onIndexed(int index,
       
  1161                               CharSequence name,
       
  1162                               CharSequence value) {
       
  1163             debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
       
  1164                            index, name, value);
       
  1165             delegate.onIndexed(index, name, value);
       
  1166         }
       
  1167 
       
  1168         @Override
       
  1169         public void onLiteral(int index,
       
  1170                               CharSequence name,
       
  1171                               CharSequence value,
       
  1172                               boolean valueHuffman) {
       
  1173             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
       
  1174                               index, name, value, valueHuffman);
       
  1175             delegate.onLiteral(index, name, value, valueHuffman);
       
  1176         }
       
  1177 
       
  1178         @Override
       
  1179         public void onLiteral(CharSequence name,
       
  1180                               boolean nameHuffman,
       
  1181                               CharSequence value,
       
  1182                               boolean valueHuffman) {
       
  1183             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
       
  1184                            name, nameHuffman, value, valueHuffman);
       
  1185             delegate.onLiteral(name, nameHuffman, value, valueHuffman);
       
  1186         }
       
  1187 
       
  1188         @Override
       
  1189         public void onLiteralNeverIndexed(int index,
       
  1190                                           CharSequence name,
       
  1191                                           CharSequence value,
       
  1192                                           boolean valueHuffman) {
       
  1193             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
       
  1194                            index, name, value, valueHuffman);
       
  1195             delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
       
  1196         }
       
  1197 
       
  1198         @Override
       
  1199         public void onLiteralNeverIndexed(CharSequence name,
       
  1200                                           boolean nameHuffman,
       
  1201                                           CharSequence value,
       
  1202                                           boolean valueHuffman) {
       
  1203             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
       
  1204                            name, nameHuffman, value, valueHuffman);
       
  1205             delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
       
  1206         }
       
  1207 
       
  1208         @Override
       
  1209         public void onLiteralWithIndexing(int index,
       
  1210                                           CharSequence name,
       
  1211                                           CharSequence value,
       
  1212                                           boolean valueHuffman) {
       
  1213             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
       
  1214                            index, name, value, valueHuffman);
       
  1215             delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
       
  1216         }
       
  1217 
       
  1218         @Override
       
  1219         public void onLiteralWithIndexing(CharSequence name,
       
  1220                                           boolean nameHuffman,
       
  1221                                           CharSequence value,
       
  1222                                           boolean valueHuffman) {
       
  1223             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
       
  1224                               name, nameHuffman, value, valueHuffman);
       
  1225             delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
       
  1226         }
       
  1227 
       
  1228         @Override
       
  1229         public void onSizeUpdate(int capacity) {
       
  1230             debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
       
  1231             delegate.onSizeUpdate(capacity);
       
  1232         }
       
  1233 
       
  1234         @Override
       
  1235         HttpHeadersImpl headers() {
       
  1236             return delegate.headers();
       
  1237         }
       
  1238     }
       
  1239 
       
  1240     static class HeaderDecoder implements DecodingCallback {
       
  1241         HttpHeadersImpl headers;
       
  1242 
       
  1243         HeaderDecoder() {
       
  1244             this.headers = new HttpHeadersImpl();
       
  1245         }
       
  1246 
       
  1247         @Override
       
  1248         public void onDecoded(CharSequence name, CharSequence value) {
       
  1249             headers.addHeader(name.toString(), value.toString());
       
  1250         }
       
  1251 
       
  1252         HttpHeadersImpl headers() {
       
  1253             return headers;
       
  1254         }
       
  1255     }
       
  1256 
       
  1257     static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
       
  1258 
       
  1259         final int initialWindowSize;
       
  1260         public ConnectionWindowUpdateSender(Http2Connection connection,
       
  1261                                             int initialWindowSize) {
       
  1262             super(connection, initialWindowSize);
       
  1263             this.initialWindowSize = initialWindowSize;
       
  1264         }
       
  1265 
       
  1266         @Override
       
  1267         int getStreamId() {
       
  1268             return 0;
       
  1269         }
       
  1270     }
       
  1271 
       
  1272     /**
       
  1273      * Thrown when https handshake negotiates http/1.1 alpn instead of h2
       
  1274      */
       
  1275     static final class ALPNException extends IOException {
       
  1276         private static final long serialVersionUID = 0L;
       
  1277         final transient AbstractAsyncSSLConnection connection;
       
  1278 
       
  1279         ALPNException(String msg, AbstractAsyncSSLConnection connection) {
       
  1280             super(msg);
       
  1281             this.connection = connection;
       
  1282         }
       
  1283 
       
  1284         AbstractAsyncSSLConnection getConnection() {
       
  1285             return connection;
       
  1286         }
       
  1287     }
       
  1288 }