src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Sun Nov 05 17:32:13 2017 +0000
@@ -25,26 +25,32 @@
 
 package jdk.incubator.http;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import jdk.incubator.http.HttpConnection.Mode;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Formatter;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.net.ssl.SSLEngine;
 import jdk.incubator.http.internal.common.*;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
+import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.Encoder;
 import jdk.incubator.http.internal.hpack.Decoder;
@@ -83,11 +89,21 @@
  * stream are provided by calling Stream.incoming().
  */
 class Http2Connection  {
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
+    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+    final static System.Logger  DEBUG_LOGGER =
+            Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
+    private final System.Logger debugHpack =
+                  Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+    static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
+
     /*
      *  ByteBuffer pooling strategy for HTTP/2 protocol:
      *
      * In general there are 4 points where ByteBuffers are used:
-     *  - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
+     *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
      *    in case of SSL connection.
      *
      * 1. Outgoing frames encoded to ByteBuffers.
@@ -123,10 +139,15 @@
         {
             // if preface is not sent, buffers data in the pending list
             if (!prefaceSent) {
+                debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
+                          buf.get().remaining());
                 synchronized (this) {
                     if (!prefaceSent) {
                         if (pending == null) pending = new ArrayList<>();
                         pending.add(buf);
+                        debug.log(Level.DEBUG, () -> "there are now "
+                              + Utils.remaining(pending.toArray(new ByteBufferReference[0]))
+                              + " bytes buffered waiting for preface to be sent");
                         return false;
                     }
                 }
@@ -143,13 +164,18 @@
             this.pending = null;
             if (pending != null) {
                 // flush pending data
+                debug.log(Level.DEBUG, () -> "Processing buffered data: "
+                      + Utils.remaining(pending.toArray(new ByteBufferReference[0])));
                 for (ByteBufferReference b : pending) {
                     decoder.decode(b);
                 }
             }
-
+            ByteBuffer b = buf.get();
             // push the received buffer to the frames decoder.
-            decoder.decode(buf);
+            if (b != EMPTY_TRIGGER) {
+                debug.log(Level.DEBUG, "Processing %d", buf.get().remaining());
+                decoder.decode(buf);
+            }
             return true;
         }
 
@@ -167,7 +193,9 @@
 
     //-------------------------------------
     final HttpConnection connection;
-    private final HttpClientImpl client;
+    // only keep a strong reference to Http2ClientImpl, which only has
+    // a weak reference on HttpClientImpl, to avoid strong references
+    // from the selector thread to HttpClientImpl (via attachments).
     private final Http2ClientImpl client2;
     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
     private int nextstreamid;
@@ -186,7 +214,10 @@
      */
     private final WindowController windowController = new WindowController();
     private final FramesController framesController = new FramesController();
+    private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
     final WindowUpdateSender windowUpdater;
+    private volatile Throwable cause;
+    private volatile Supplier<ByteBuffer> initial;
 
     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 
@@ -199,7 +230,6 @@
                             int nextstreamid,
                             String key) {
         this.connection = connection;
-        this.client = client2.client();
         this.client2 = client2;
         this.nextstreamid = nextstreamid;
         this.key = key;
@@ -209,102 +239,151 @@
         this.serverSettings = SettingsFrame.getDefaultSettings();
         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
-        this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
+        debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
+        debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
+        debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
+        this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize());
     }
 
     /**
      * Case 1) Create from upgraded HTTP/1.1 connection.
-     * Is ready to use. Will not be SSL. exchange is the Exchange
+     * Is ready to use. Can be SSL. exchange is the Exchange
      * that initiated the connection, whose response will be delivered
      * on a Stream.
      */
-    Http2Connection(HttpConnection connection,
+    private Http2Connection(HttpConnection connection,
                     Http2ClientImpl client2,
                     Exchange<?> exchange,
-                    ByteBuffer initial)
+                    Supplier<ByteBuffer> initial)
         throws IOException, InterruptedException
     {
         this(connection,
                 client2,
                 3, // stream 1 is registered during the upgrade
                 keyFor(connection));
-        assert !(connection instanceof SSLConnection);
         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 
         Stream<?> initialStream = createStream(exchange);
         initialStream.registerStream(1);
         windowController.registerStream(1, getInitialSendWindowSize());
         initialStream.requestSent();
+        // Upgrading:
+        //    set callbacks before sending preface - makes sure anything that
+        //    might be sent by the server will come our way.
+        this.initial = initial;
+        connectFlows(connection);
         sendConnectionPreface();
-        // start reading and writing
-        // start reading
-        AsyncConnection asyncConn = (AsyncConnection)connection;
-        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
-        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
-        asyncReceive(ByteBufferReference.of(initial));
-        asyncConn.startReading();
     }
 
     // async style but completes immediately
     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
                                                           Http2ClientImpl client2,
                                                           Exchange<?> exchange,
-                                                          ByteBuffer initial) {
+                                                          Supplier<ByteBuffer> initial)
+    {
         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
     }
 
+    // Requires TLS handshake. So, is really async
+    static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
+                                                          Http2ClientImpl h2client) {
+        assert request.secure();
+        AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
+        HttpConnection.getConnection(request.getAddress(h2client.client()),
+                                     h2client.client(),
+                                     request,
+                                     HttpClient.Version.HTTP_2);
+
+        return connection.connectAsync()
+                  .thenCompose(unused -> checkSSLConfig(connection))
+                  .thenCompose(notused-> {
+                      CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+                      try {
+                          Http2Connection hc = new Http2Connection(request, h2client, connection);
+                          cf.complete(hc);
+                      } catch (IOException e) {
+                          cf.completeExceptionally(e);
+                      }
+                      return cf; } );
+    }
+
     /**
      * Cases 2) 3)
      *
      * request is request to be sent.
      */
-    Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
-        throws IOException, InterruptedException
+    private Http2Connection(HttpRequestImpl request,
+                            Http2ClientImpl h2client,
+                            HttpConnection connection)
+        throws IOException
     {
-        this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
-                h2client,
-                1,
-                keyFor(request.uri(), request.proxy(h2client.client())));
+        this(connection,
+             h2client,
+             1,
+             keyFor(request.uri(), request.proxy(h2client.client())));
+
         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 
-        // start reading
-        AsyncConnection asyncConn = (AsyncConnection)connection;
-        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
-        connection.connect();
-        checkSSLConfig();
         // safe to resume async reading now.
-        asyncConn.enableCallback();
+        connectFlows(connection);
         sendConnectionPreface();
     }
 
+    private void connectFlows(HttpConnection connection) {
+        FlowTube tube =  connection.getConnectionFlow();
+        // Connect the flow to our Http2TubeSubscriber:
+        // Using connection.publisher() here is a hack that
+        // allows us to continue calling connection.writeAsync()
+        // and connection.flushAsync() transparently.
+        // We will eventually need to implement our own publisher
+        // to write to the flow instead.
+        tube.connectFlows(connection.publisher(), // hack
+                      subscriber);
+    }
+
+    final HttpClientImpl client() {
+        return client2.client();
+    }
+
     /**
      * Throws an IOException if h2 was not negotiated
      */
-    private void checkSSLConfig() throws IOException {
-        AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;
-        SSLEngine engine = aconn.getEngine();
-        String alpn = engine.getApplicationProtocol();
-        if (alpn == null || !alpn.equals("h2")) {
-            String msg;
-            if (alpn == null) {
-                Log.logSSL("ALPN not supported");
-                msg = "ALPN not supported";
-            } else switch (alpn) {
-              case "":
-                Log.logSSL("No ALPN returned");
-                msg = "No ALPN negotiated";
-                break;
-              case "http/1.1":
-                Log.logSSL("HTTP/1.1 ALPN returned");
-                msg = "HTTP/1.1 ALPN returned";
-                break;
-              default:
-                Log.logSSL("unknown ALPN returned");
-                msg = "Unexpected ALPN: " + alpn;
-                throw new IOException(msg);
+    private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
+        assert aconn.isSecure();
+
+        Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
+            CompletableFuture<Void> cf = new MinimalFuture<>();
+            SSLEngine engine = aconn.getEngine();
+            assert Objects.equals(alpn, engine.getApplicationProtocol());
+
+            DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
+
+            if (alpn == null || !alpn.equals("h2")) {
+                String msg;
+                if (alpn == null) {
+                    Log.logSSL("ALPN not supported");
+                    msg = "ALPN not supported";
+                } else {
+                    switch (alpn) {
+                        case "":
+                            Log.logSSL(msg = "No ALPN negotiated");
+                            break;
+                        case "http/1.1":
+                            Log.logSSL( msg = "HTTP/1.1 ALPN returned");
+                            break;
+                        default:
+                            Log.logSSL(msg = "Unexpected ALPN: " + alpn);
+                            cf.completeExceptionally(new IOException(msg));
+                    }
+                }
+                cf.completeExceptionally(new ALPNException(msg, aconn));
+                return cf;
             }
-            throw new ALPNException(msg, aconn);
-        }
+            cf.complete(null);
+            return cf;
+        };
+
+        return aconn.getALPN().thenCompose(checkAlpnCF);
     }
 
     static String keyFor(HttpConnection connection) {
@@ -322,7 +401,7 @@
         String host;
         int port;
 
-        if (isProxy) {
+        if (proxy != null) {
             host = proxy.getHostString();
             port = proxy.getPort();
         } else {
@@ -381,7 +460,11 @@
         return words.stream().collect(Collectors.joining(" "));
     }
 
-    private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
+    private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
+            throws IOException
+    {
+        debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
+
         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 
         ByteBufferReference[] buffers = frame.getHeaderBlock();
@@ -390,7 +473,7 @@
         }
     }
 
-    int getInitialSendWindowSize() {
+    final int getInitialSendWindowSize() {
         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
     }
 
@@ -400,7 +483,7 @@
         sendFrame(f);
     }
 
-    private ByteBufferPool readBufferPool = new ByteBufferPool();
+    private final ByteBufferPool readBufferPool = new ByteBufferPool();
 
     // provides buffer to read data (default size)
     public ByteBufferReference getReadBuffer() {
@@ -409,7 +492,8 @@
 
     private final Object readlock = new Object();
 
-    public void asyncReceive(ByteBufferReference buffer) {
+    long count;
+    public final void asyncReceive(ByteBufferReference buffer) {
         // We don't need to read anything and
         // we don't want to send anything back to the server
         // until the connection preface has been sent.
@@ -419,11 +503,45 @@
         // SettingsFrame sent by the server) before the connection
         // preface is fully sent might result in the server
         // sending a GOAWAY frame with 'invalid_preface'.
+        //
+        // Note: asyncReceive is only called from the Http2TubeSubscriber
+        //       sequential scheduler. Only asyncReceive uses the readLock.
+        //       Therefore synchronizing on the readlock here should be
+        //       safe.
+        //
         synchronized (readlock) {
             try {
+                Supplier<ByteBuffer> bs = initial;
+                // ensure that we always handle the initial buffer first,
+                // if any.
+                if (bs != null) {
+                    initial = null;
+                    ByteBuffer b = bs.get();
+                    if (b.hasRemaining()) {
+                        long c = ++count;
+                        debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
+                            + c +"): " + b.remaining());
+                        framesController.processReceivedData(framesDecoder,
+                                ByteBufferReference.of(b));
+                    }
+                }
+                ByteBuffer b = buffer.get();
                 // the readlock ensures that the order of incoming buffers
                 // is preserved.
-                framesController.processReceivedData(framesDecoder, buffer);
+                if (b == EMPTY_TRIGGER) {
+                    debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
+                    boolean prefaceSent = framesController.prefaceSent;
+                    assert prefaceSent;
+                    // call framesController.processReceivedData to potentially
+                    // trigger the processing of all the data buffered there.
+                    framesController.processReceivedData(framesDecoder, buffer);
+                    debug.log(Level.DEBUG, "H2 processed buffered data");
+                } else {
+                    long c = ++count;
+                    debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
+                    framesController.processReceivedData(framesDecoder, buffer);
+                    debug.log(Level.DEBUG, "H2 processed(%d)", c);
+                }
             } catch (Throwable e) {
                 String msg = Utils.stackTrace(e);
                 Log.logTrace(msg);
@@ -432,10 +550,20 @@
         }
     }
 
+    Throwable getRecordedCause() {
+        return cause;
+    }
 
     void shutdown(Throwable t) {
+        debug.log(Level.DEBUG, () -> "Shutting down h2c: " + t);
+        if (closed == true) return;
+        synchronized (this) {
+            if (closed == true) return;
+            closed = true;
+        }
         Log.logError(t);
-        closed = true;
+        Throwable initialCause = this.cause;
+        if (initialCause == null) this.cause = t;
         client2.deleteConnection(this);
         List<Stream<?>> c = new LinkedList<>(streams.values());
         for (Stream<?> s : c) {
@@ -457,8 +585,11 @@
         if (frame instanceof MalformedFrame) {
             Log.logError(((MalformedFrame) frame).getMessage());
             if (streamid == 0) {
-                protocolError(((MalformedFrame) frame).getErrorCode());
+                protocolError(((MalformedFrame) frame).getErrorCode(),
+                        ((MalformedFrame) frame).getMessage());
             } else {
+                debug.log(Level.DEBUG, () -> "Reset stream: "
+                          + ((MalformedFrame) frame).getMessage());
                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
             }
             return;
@@ -476,6 +607,13 @@
             if (stream == null) {
                 // Should never receive a frame with unknown stream id
 
+                if (frame instanceof HeaderFrame) {
+                    // always decode the headers as they may affect
+                    // connection-level HPACK decoding state
+                    HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
+                    decodeHeaders((HeaderFrame) frame, decoder);
+                }
+
                 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
                 // response to a RST_STREAM frame.
                 if (!(frame instanceof ResetFrame)) {
@@ -499,6 +637,11 @@
     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
         throws IOException
     {
+        // always decode the headers as they may affect connection-level HPACK
+        // decoding state
+        HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
+        decodeHeaders(pp, decoder);
+
         HttpRequestImpl parentReq = parent.request;
         int promisedStreamid = pp.getPromisedStream();
         if (promisedStreamid != nextPushStream) {
@@ -507,8 +650,7 @@
         } else {
             nextPushStream += 2;
         }
-        HeaderDecoder decoder = new HeaderDecoder();
-        decodeHeaders(pp, decoder);
+
         HttpHeadersImpl headers = decoder.headers();
         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
@@ -549,7 +691,15 @@
     }
 
     void closeStream(int streamid) {
+        debug.log(Level.DEBUG, "Closed stream %d", streamid);
         Stream<?> s = streams.remove(streamid);
+        if (s != null) {
+            // decrement the reference count on the HttpClientImpl
+            // to allow the SelectorManager thread to exit if no
+            // other operation is pending and the facade is no
+            // longer referenced.
+            client().unreference();
+        }
         // ## Remove s != null. It is a hack for delayed cancellation,reset
         if (s != null && !(s instanceof Stream.PushedStream)) {
             // Since PushStreams have no request body, then they have no
@@ -579,9 +729,15 @@
     private void protocolError(int errorCode)
         throws IOException
     {
+        protocolError(errorCode, null);
+    }
+
+    private void protocolError(int errorCode, String msg)
+        throws IOException
+    {
         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
         sendFrame(frame);
-        shutdown(new IOException("protocol error"));
+        shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
     }
 
     private void handleSettings(SettingsFrame frame)
@@ -655,7 +811,8 @@
         ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
         Log.logFrames(sf, "OUT");
         // send preface bytes and SettingsFrame together
-        connection.write(ref.get());
+        connection.writeAsync(new ByteBufferReference[] {ref});
+        connection.flushAsync();
         // mark preface sent.
         framesController.markPrefaceSent();
         Log.logTrace("PREFACE_BYTES sent");
@@ -669,6 +826,9 @@
         // cause any pending data stored before the preface was sent to be
         // flushed (see PrefaceController).
         Log.logTrace("finished sending connection preface");
+        debug.log(Level.DEBUG, "Triggering processing of buffered data"
+                  + " after sending connection preface");
+        subscriber.onNext(List.of(EMPTY_TRIGGER));
     }
 
     /**
@@ -682,22 +842,32 @@
     /**
      * Creates Stream with given id.
      */
-    <T> Stream<T> createStream(Exchange<T> exchange) {
-        Stream<T> stream = new Stream<>(client, this, exchange, windowController);
+    final <T> Stream<T> createStream(Exchange<T> exchange) {
+        Stream<T> stream = new Stream<>(client(), this, exchange, windowController);
         return stream;
     }
 
     <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
         PushGroup<?,T> pg = parent.exchange.getPushGroup();
-        return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
+        return new Stream.PushedStream<>(pg, client(), this, parent, pushEx);
     }
 
     <T> void putStream(Stream<T> stream, int streamid) {
+        // increment the reference count on the HttpClientImpl
+        // to prevent the SelectorManager thread from exiting until
+        // the stream is closed.
+        client().reference();
         streams.put(streamid, stream);
     }
 
     void deleteStream(int streamid) {
-        streams.remove(streamid);
+        if (streams.remove(streamid) != null) {
+            // decrement the reference count on the HttpClientImpl
+            // to allow the SelectorManager thread to exit if no
+            // other operation is pending and the facade is no
+            // longer referenced.
+            client().unreference();
+        }
         windowController.removeStream(streamid);
     }
 
@@ -728,7 +898,7 @@
     // There can be no concurrent access to this  buffer as all access to this buffer
     // and its content happen within a single critical code block section protected
     // by the sendLock. / (see sendFrame())
-    private ByteBufferPool headerEncodingPool = new ByteBufferPool();
+    private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
 
     private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
         ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
@@ -872,6 +1042,208 @@
         }
     }
 
+    /**
+     * Returns the TubeSubscriber for reading from the connection flow.
+     * @return the TubeSubscriber for reading from the connection flow.
+     */
+    TubeSubscriber subscriber() {
+        return subscriber;
+    }
+
+    /**
+     * A simple tube subscriber for reading from the connection flow.
+     */
+    final class Http2TubeSubscriber implements TubeSubscriber {
+        volatile Flow.Subscription subscription;
+        volatile boolean completed;
+        volatile boolean dropped;
+        volatile Throwable error;
+        final ConcurrentLinkedQueue<ByteBuffer> queue
+                = new ConcurrentLinkedQueue<>();
+        final SequentialScheduler scheduler = new SequentialScheduler(
+                        new SynchronizedRestartableTask(this::processQueue));
+
+        final void processQueue() {
+            try {
+                while (!queue.isEmpty() && !scheduler.isStopped()) {
+                    ByteBuffer buffer = queue.poll();
+                    debug.log(Level.DEBUG,
+                              "sending %d to Http2Connection.asyncReceive",
+                              buffer.remaining());
+                    asyncReceive(ByteBufferReference.of(buffer));
+                }
+            } catch (Throwable t) {
+                Throwable x = error;
+                if (x == null) error = t;
+            } finally {
+                Throwable x = error;
+                if (x != null) {
+                    debug.log(Level.DEBUG, "Stopping scheduler", x);
+                    scheduler.stop();
+                    Http2Connection.this.shutdown(x);
+                }
+            }
+        }
+
+
+        public void onSubscribe(Flow.Subscription subscription) {
+            // supports being called multiple time.
+            // doesn't cancel the previous subscription, since that is
+            // most probably the same as the new subscription.
+            assert this.subscription == null || dropped == false;
+            this.subscription = subscription;
+            dropped = false;
+            // TODO FIXME: request(1) should be done by the delegate.
+            if (!completed) {
+                debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
+                subscription.request(Long.MAX_VALUE);
+            } else {
+                debug.log(Level.DEBUG, "onSubscribe: already completed");
+            }
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
+                    + " bytes in " + item.size() + " buffers");
+            queue.addAll(item);
+            scheduler.deferOrSchedule(client().theExecutor());
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            debug.log(Level.DEBUG, () -> "onError: " + throwable);
+            error = throwable;
+            completed = true;
+            scheduler.deferOrSchedule(client().theExecutor());
+        }
+
+        @Override
+        public void onComplete() {
+            debug.log(Level.DEBUG, "EOF");
+            error = new EOFException("EOF reached while reading");
+            completed = true;
+            scheduler.deferOrSchedule(client().theExecutor());
+        }
+
+        public void dropSubscription() {
+            debug.log(Level.DEBUG, "dropSubscription");
+            // we could probably set subscription to null here...
+            // then we might not need the 'dropped' boolean?
+            dropped = true;
+        }
+    }
+
+    @Override
+    public final String toString() {
+        return dbgString();
+    }
+
+    final String dbgString() {
+        return "Http2Connection("
+                    + connection.getConnectionFlow() + ")";
+    }
+
+    final class LoggingHeaderDecoder extends HeaderDecoder {
+
+        private final HeaderDecoder delegate;
+        private final System.Logger debugHpack =
+                Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+
+        LoggingHeaderDecoder(HeaderDecoder delegate) {
+            this.delegate = delegate;
+        }
+
+        String dbgString() {
+            return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
+        }
+
+        @Override
+        public void onDecoded(CharSequence name, CharSequence value) {
+            delegate.onDecoded(name, value);
+        }
+
+        @Override
+        public void onIndexed(int index,
+                              CharSequence name,
+                              CharSequence value) {
+            debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
+                           index, name, value);
+            delegate.onIndexed(index, name, value);
+        }
+
+        @Override
+        public void onLiteral(int index,
+                              CharSequence name,
+                              CharSequence value,
+                              boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+                              index, name, value, valueHuffman);
+            delegate.onLiteral(index, name, value, valueHuffman);
+        }
+
+        @Override
+        public void onLiteral(CharSequence name,
+                              boolean nameHuffman,
+                              CharSequence value,
+                              boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+                           name, nameHuffman, value, valueHuffman);
+            delegate.onLiteral(name, nameHuffman, value, valueHuffman);
+        }
+
+        @Override
+        public void onLiteralNeverIndexed(int index,
+                                          CharSequence name,
+                                          CharSequence value,
+                                          boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+                           index, name, value, valueHuffman);
+            delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
+        }
+
+        @Override
+        public void onLiteralNeverIndexed(CharSequence name,
+                                          boolean nameHuffman,
+                                          CharSequence value,
+                                          boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+                           name, nameHuffman, value, valueHuffman);
+            delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
+        }
+
+        @Override
+        public void onLiteralWithIndexing(int index,
+                                          CharSequence name,
+                                          CharSequence value,
+                                          boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+                           index, name, value, valueHuffman);
+            delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
+        }
+
+        @Override
+        public void onLiteralWithIndexing(CharSequence name,
+                                          boolean nameHuffman,
+                                          CharSequence value,
+                                          boolean valueHuffman) {
+            debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+                              name, nameHuffman, value, valueHuffman);
+            delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
+        }
+
+        @Override
+        public void onSizeUpdate(int capacity) {
+            debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
+            delegate.onSizeUpdate(capacity);
+        }
+
+        @Override
+        HttpHeadersImpl headers() {
+            return delegate.headers();
+        }
+    }
+
     static class HeaderDecoder implements DecodingCallback {
         HttpHeadersImpl headers;