src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
author erikj
Tue, 12 Sep 2017 19:03:39 +0200
changeset 47216 71c04702a3d5
parent 46157 jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java@f3c2dcb8d8fe
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8187443: Forest Consolidation: Move files to unified layout Reviewed-by: darcy, ihse

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import jdk.incubator.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Formatter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import javax.net.ssl.SSLEngine;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;

import static jdk.incubator.http.internal.frame.SettingsFrame.*;


/**
 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
 *
 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
 * to a HttpClientImpl.
 *
 * Creation cases:
 * 1) upgraded HTTP/1.1 plain tcp connection
 * 2) prior knowledge directly created plain tcp connection
 * 3) directly created HTTP/2 SSL connection which uses ALPN.
 *
 * Sending is done by writing directly to underlying HttpConnection object which
 * is operating in async mode. No flow control applies on output at this level
 * and all writes are just executed as puts to an output Q belonging to HttpConnection
 * Flow control is implemented by HTTP/2 protocol itself.
 *
 * Hpack header compression
 * and outgoing stream creation is also done here, because these operations
 * must be synchronized at the socket level. Stream objects send frames simply
 * by placing them on the connection's output Queue. sendFrame() is called
 * from a higher level (Stream) thread.
 *
 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
 * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
 * or handles them directly itself. This thread performs hpack decompression
 * and incoming stream creation (Server push). Incoming frames destined for a
 * stream are provided by calling Stream.incoming().
 */
class Http2Connection  {
    /*
     *  ByteBuffer pooling strategy for HTTP/2 protocol:
     *
     * In general there are 4 points where ByteBuffers are used:
     *  - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
     *    in case of SSL connection.
     *
     * 1. Outgoing frames encoded to ByteBuffers.
     *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
     *    At this place no pools at all. All outgoing buffers should be collected by GC.
     *
     * 2. Incoming ByteBuffers (decoded to frames).
     *    Here, total elimination of BB pool is not a good idea.
     *    We don't know how many bytes we will receive through network.
     * So here we allocate buffer of reasonable size. The following life of the BB:
     * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
     *     BB is returned to pool,
     * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
     *     Such BB is never returned to pool and will be GCed.
     * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
     *     the buffer could be release to pool.
     *
     * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
     *    because of we can't predict size encrypted packets.
     *
     */


    // A small class that allows to control frames with respect to the state of
    // the connection preface. Any data received before the connection
    // preface is sent will be buffered.
    private final class FramesController {
        volatile boolean prefaceSent;
        volatile List<ByteBufferReference> pending;

        boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf)
                throws IOException
        {
            // if preface is not sent, buffers data in the pending list
            if (!prefaceSent) {
                synchronized (this) {
                    if (!prefaceSent) {
                        if (pending == null) pending = new ArrayList<>();
                        pending.add(buf);
                        return false;
                    }
                }
            }

            // Preface is sent. Checks for pending data and flush it.
            // We rely on this method being called from within the readlock,
            // so we know that no other thread could execute this method
            // concurrently while we're here.
            // This ensures that later incoming buffers will not
            // be processed before we have flushed the pending queue.
            // No additional synchronization is therefore necessary here.
            List<ByteBufferReference> pending = this.pending;
            this.pending = null;
            if (pending != null) {
                // flush pending data
                for (ByteBufferReference b : pending) {
                    decoder.decode(b);
                }
            }

            // push the received buffer to the frames decoder.
            decoder.decode(buf);
            return true;
        }

        // Mark that the connection preface is sent
        void markPrefaceSent() {
            assert !prefaceSent;
            synchronized (this) {
                prefaceSent = true;
            }
        }

    }

    volatile boolean closed;

    //-------------------------------------
    final HttpConnection connection;
    private final HttpClientImpl client;
    private final Http2ClientImpl client2;
    private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
    private int nextstreamid;
    private int nextPushStream = 2;
    private final Encoder hpackOut;
    private final Decoder hpackIn;
    final SettingsFrame clientSettings;
    private volatile SettingsFrame serverSettings;
    private final String key; // for HttpClientImpl.connections map
    private final FramesDecoder framesDecoder;
    private final FramesEncoder framesEncoder = new FramesEncoder();

    /**
     * Send Window controller for both connection and stream windows.
     * Each of this connection's Streams MUST use this controller.
     */
    private final WindowController windowController = new WindowController();
    private final FramesController framesController = new FramesController();
    final WindowUpdateSender windowUpdater;

    static final int DEFAULT_FRAME_SIZE = 16 * 1024;


    // TODO: need list of control frames from other threads
    // that need to be sent

    private Http2Connection(HttpConnection connection,
                            Http2ClientImpl client2,
                            int nextstreamid,
                            String key) {
        this.connection = connection;
        this.client = client2.client();
        this.client2 = client2;
        this.nextstreamid = nextstreamid;
        this.key = key;
        this.clientSettings = this.client2.getClientSettings();
        this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
        // serverSettings will be updated by server
        this.serverSettings = SettingsFrame.getDefaultSettings();
        this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
        this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
        this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
    }

    /**
     * Case 1) Create from upgraded HTTP/1.1 connection.
     * Is ready to use. Will not be SSL. exchange is the Exchange
     * that initiated the connection, whose response will be delivered
     * on a Stream.
     */
    Http2Connection(HttpConnection connection,
                    Http2ClientImpl client2,
                    Exchange<?> exchange,
                    ByteBuffer initial)
        throws IOException, InterruptedException
    {
        this(connection,
                client2,
                3, // stream 1 is registered during the upgrade
                keyFor(connection));
        assert !(connection instanceof SSLConnection);
        Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());

        Stream<?> initialStream = createStream(exchange);
        initialStream.registerStream(1);
        windowController.registerStream(1, getInitialSendWindowSize());
        initialStream.requestSent();
        sendConnectionPreface();
        // start reading and writing
        // start reading
        AsyncConnection asyncConn = (AsyncConnection)connection;
        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
        asyncReceive(ByteBufferReference.of(initial));
        asyncConn.startReading();
    }

    // async style but completes immediately
    static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
                                                          Http2ClientImpl client2,
                                                          Exchange<?> exchange,
                                                          ByteBuffer initial) {
        return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
    }

    /**
     * Cases 2) 3)
     *
     * request is request to be sent.
     */
    Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
        throws IOException, InterruptedException
    {
        this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
                h2client,
                1,
                keyFor(request.uri(), request.proxy(h2client.client())));
        Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());

        // start reading
        AsyncConnection asyncConn = (AsyncConnection)connection;
        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
        connection.connect();
        checkSSLConfig();
        // safe to resume async reading now.
        asyncConn.enableCallback();
        sendConnectionPreface();
    }

    /**
     * Throws an IOException if h2 was not negotiated
     */
    private void checkSSLConfig() throws IOException {
        AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;
        SSLEngine engine = aconn.getEngine();
        String alpn = engine.getApplicationProtocol();
        if (alpn == null || !alpn.equals("h2")) {
            String msg;
            if (alpn == null) {
                Log.logSSL("ALPN not supported");
                msg = "ALPN not supported";
            } else switch (alpn) {
              case "":
                Log.logSSL("No ALPN returned");
                msg = "No ALPN negotiated";
                break;
              case "http/1.1":
                Log.logSSL("HTTP/1.1 ALPN returned");
                msg = "HTTP/1.1 ALPN returned";
                break;
              default:
                Log.logSSL("unknown ALPN returned");
                msg = "Unexpected ALPN: " + alpn;
                throw new IOException(msg);
            }
            throw new ALPNException(msg, aconn);
        }
    }

    static String keyFor(HttpConnection connection) {
        boolean isProxy = connection.isProxied();
        boolean isSecure = connection.isSecure();
        InetSocketAddress addr = connection.address();

        return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
    }

    static String keyFor(URI uri, InetSocketAddress proxy) {
        boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
        boolean isProxy = proxy != null;

        String host;
        int port;

        if (isProxy) {
            host = proxy.getHostString();
            port = proxy.getPort();
        } else {
            host = uri.getHost();
            port = uri.getPort();
        }
        return keyString(isSecure, isProxy, host, port);
    }

    // {C,S}:{H:P}:host:port
    // C indicates clear text connection "http"
    // S indicates secure "https"
    // H indicates host (direct) connection
    // P indicates proxy
    // Eg: "S:H:foo.com:80"
    static String keyString(boolean secure, boolean proxy, String host, int port) {
        return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
    }

    String key() {
        return this.key;
    }

    void putConnection() {
        client2.putConnection(this);
    }

    private static String toHexdump1(ByteBuffer bb) {
        bb.mark();
        StringBuilder sb = new StringBuilder(512);
        Formatter f = new Formatter(sb);

        while (bb.hasRemaining()) {
            int i =  Byte.toUnsignedInt(bb.get());
            f.format("%02x:", i);
        }
        sb.deleteCharAt(sb.length()-1);
        bb.reset();
        return sb.toString();
    }

    private static String toHexdump(ByteBuffer bb) {
        List<String> words = new ArrayList<>();
        int i = 0;
        bb.mark();
        while (bb.hasRemaining()) {
            if (i % 2 == 0) {
                words.add("");
            }
            byte b = bb.get();
            String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
            words.set(i / 2, words.get(i / 2) + hex);
            i++;
        }
        bb.reset();
        return words.stream().collect(Collectors.joining(" "));
    }

    private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
        boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);

        ByteBufferReference[] buffers = frame.getHeaderBlock();
        for (int i = 0; i < buffers.length; i++) {
            hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);
        }
    }

    int getInitialSendWindowSize() {
        return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
    }

    void close() {
        GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
        // TODO: set last stream. For now zero ok.
        sendFrame(f);
    }

    private ByteBufferPool readBufferPool = new ByteBufferPool();

    // provides buffer to read data (default size)
    public ByteBufferReference getReadBuffer() {
        return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
    }

    private final Object readlock = new Object();

    public void asyncReceive(ByteBufferReference buffer) {
        // We don't need to read anything and
        // we don't want to send anything back to the server
        // until the connection preface has been sent.
        // Therefore we're going to wait if needed before reading
        // (and thus replying) to anything.
        // Starting to reply to something (e.g send an ACK to a
        // SettingsFrame sent by the server) before the connection
        // preface is fully sent might result in the server
        // sending a GOAWAY frame with 'invalid_preface'.
        synchronized (readlock) {
            try {
                // the readlock ensures that the order of incoming buffers
                // is preserved.
                framesController.processReceivedData(framesDecoder, buffer);
            } catch (Throwable e) {
                String msg = Utils.stackTrace(e);
                Log.logTrace(msg);
                shutdown(e);
            }
        }
    }


    void shutdown(Throwable t) {
        Log.logError(t);
        closed = true;
        client2.deleteConnection(this);
        List<Stream<?>> c = new LinkedList<>(streams.values());
        for (Stream<?> s : c) {
            s.cancelImpl(t);
        }
        connection.close();
    }

    /**
     * Handles stream 0 (common) frames that apply to whole connection and passes
     * other stream specific frames to that Stream object.
     *
     * Invokes Stream.incoming() which is expected to process frame without
     * blocking.
     */
    void processFrame(Http2Frame frame) throws IOException {
        Log.logFrames(frame, "IN");
        int streamid = frame.streamid();
        if (frame instanceof MalformedFrame) {
            Log.logError(((MalformedFrame) frame).getMessage());
            if (streamid == 0) {
                protocolError(((MalformedFrame) frame).getErrorCode());
            } else {
                resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
            }
            return;
        }
        if (streamid == 0) {
            handleConnectionFrame(frame);
        } else {
            if (frame instanceof SettingsFrame) {
                // The stream identifier for a SETTINGS frame MUST be zero
                protocolError(GoAwayFrame.PROTOCOL_ERROR);
                return;
            }

            Stream<?> stream = getStream(streamid);
            if (stream == null) {
                // Should never receive a frame with unknown stream id

                // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
                // response to a RST_STREAM frame.
                if (!(frame instanceof ResetFrame)) {
                    resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
                }
                return;
            }
            if (frame instanceof PushPromiseFrame) {
                PushPromiseFrame pp = (PushPromiseFrame)frame;
                handlePushPromise(stream, pp);
            } else if (frame instanceof HeaderFrame) {
                // decode headers (or continuation)
                decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
                stream.incoming(frame);
            } else {
                stream.incoming(frame);
            }
        }
    }

    private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
        throws IOException
    {
        HttpRequestImpl parentReq = parent.request;
        int promisedStreamid = pp.getPromisedStream();
        if (promisedStreamid != nextPushStream) {
            resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
            return;
        } else {
            nextPushStream += 2;
        }
        HeaderDecoder decoder = new HeaderDecoder();
        decodeHeaders(pp, decoder);
        HttpHeadersImpl headers = decoder.headers();
        HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
        Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
        Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
        pushExch.exchImpl = pushStream;
        pushStream.registerStream(promisedStreamid);
        parent.incoming_pushPromise(pushReq, pushStream);
    }

    private void handleConnectionFrame(Http2Frame frame)
        throws IOException
    {
        switch (frame.type()) {
          case SettingsFrame.TYPE:
              handleSettings((SettingsFrame)frame);
              break;
          case PingFrame.TYPE:
              handlePing((PingFrame)frame);
              break;
          case GoAwayFrame.TYPE:
              handleGoAway((GoAwayFrame)frame);
              break;
          case WindowUpdateFrame.TYPE:
              handleWindowUpdate((WindowUpdateFrame)frame);
              break;
          default:
            protocolError(ErrorFrame.PROTOCOL_ERROR);
        }
    }

    void resetStream(int streamid, int code) throws IOException {
        Log.logError(
            "Resetting stream {0,number,integer} with error code {1,number,integer}",
            streamid, code);
        ResetFrame frame = new ResetFrame(streamid, code);
        sendFrame(frame);
        closeStream(streamid);
    }

    void closeStream(int streamid) {
        Stream<?> s = streams.remove(streamid);
        // ## Remove s != null. It is a hack for delayed cancellation,reset
        if (s != null && !(s instanceof Stream.PushedStream)) {
            // Since PushStreams have no request body, then they have no
            // corresponding entry in the window controller.
            windowController.removeStream(streamid);
        }
    }
    /**
     * Increments this connection's send Window by the amount in the given frame.
     */
    private void handleWindowUpdate(WindowUpdateFrame f)
        throws IOException
    {
        int amount = f.getUpdate();
        if (amount <= 0) {
            // ## temporarily disable to workaround a bug in Jetty where it
            // ## sends Window updates with a 0 update value.
            //protocolError(ErrorFrame.PROTOCOL_ERROR);
        } else {
            boolean success = windowController.increaseConnectionWindow(amount);
            if (!success) {
                protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
            }
        }
    }

    private void protocolError(int errorCode)
        throws IOException
    {
        GoAwayFrame frame = new GoAwayFrame(0, errorCode);
        sendFrame(frame);
        shutdown(new IOException("protocol error"));
    }

    private void handleSettings(SettingsFrame frame)
        throws IOException
    {
        assert frame.streamid() == 0;
        if (!frame.getFlag(SettingsFrame.ACK)) {
            int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
            int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
            int diff = newWindowSize - oldWindowSize;
            if (diff != 0) {
                windowController.adjustActiveStreams(diff);
            }
            serverSettings = frame;
            sendFrame(new SettingsFrame(SettingsFrame.ACK));
        }
    }

    private void handlePing(PingFrame frame)
        throws IOException
    {
        frame.setFlag(PingFrame.ACK);
        sendUnorderedFrame(frame);
    }

    private void handleGoAway(GoAwayFrame frame)
        throws IOException
    {
        shutdown(new IOException(
                        String.valueOf(connection.channel().getLocalAddress())
                        +": GOAWAY received"));
    }

    /**
     * Max frame size we are allowed to send
     */
    public int getMaxSendFrameSize() {
        int param = serverSettings.getParameter(MAX_FRAME_SIZE);
        if (param == -1) {
            param = DEFAULT_FRAME_SIZE;
        }
        return param;
    }

    /**
     * Max frame size we will receive
     */
    public int getMaxReceiveFrameSize() {
        return clientSettings.getParameter(MAX_FRAME_SIZE);
    }

    // Not sure how useful this is.
    public int getMaxHeadersSize() {
        return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
    }

    private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

    private static final byte[] PREFACE_BYTES =
        CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);

    /**
     * Sends Connection preface and Settings frame with current preferred
     * values
     */
    private void sendConnectionPreface() throws IOException {
        Log.logTrace("{0}: start sending connection preface to {1}",
                     connection.channel().getLocalAddress(),
                     connection.address());
        SettingsFrame sf = client2.getClientSettings();
        ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
        Log.logFrames(sf, "OUT");
        // send preface bytes and SettingsFrame together
        connection.write(ref.get());
        // mark preface sent.
        framesController.markPrefaceSent();
        Log.logTrace("PREFACE_BYTES sent");
        Log.logTrace("Settings Frame sent");

        // send a Window update for the receive buffer we are using
        // minus the initial 64 K specified in protocol
        final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
        windowUpdater.sendWindowUpdate(len);
        // there will be an ACK to the windows update - which should
        // cause any pending data stored before the preface was sent to be
        // flushed (see PrefaceController).
        Log.logTrace("finished sending connection preface");
    }

    /**
     * Returns an existing Stream with given id, or null if doesn't exist
     */
    @SuppressWarnings("unchecked")
    <T> Stream<T> getStream(int streamid) {
        return (Stream<T>)streams.get(streamid);
    }

    /**
     * Creates Stream with given id.
     */
    <T> Stream<T> createStream(Exchange<T> exchange) {
        Stream<T> stream = new Stream<>(client, this, exchange, windowController);
        return stream;
    }

    <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
        PushGroup<?,T> pg = parent.exchange.getPushGroup();
        return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
    }

    <T> void putStream(Stream<T> stream, int streamid) {
        streams.put(streamid, stream);
    }

    void deleteStream(int streamid) {
        streams.remove(streamid);
        windowController.removeStream(streamid);
    }

    /**
     * Encode the headers into a List<ByteBuffer> and then create HEADERS
     * and CONTINUATION frames from the list and return the List<Http2Frame>.
     */
    private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
        List<ByteBufferReference> buffers = encodeHeadersImpl(
                getMaxSendFrameSize(),
                frame.getAttachment().getRequestPseudoHeaders(),
                frame.getUserHeaders(),
                frame.getSystemHeaders());

        List<HeaderFrame> frames = new ArrayList<>(buffers.size());
        Iterator<ByteBufferReference> bufIterator = buffers.iterator();
        HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
        frames.add(oframe);
        while(bufIterator.hasNext()) {
            oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
            frames.add(oframe);
        }
        oframe.setFlag(HeaderFrame.END_HEADERS);
        return frames;
    }

    // Dedicated cache for headers encoding ByteBuffer.
    // There can be no concurrent access to this  buffer as all access to this buffer
    // and its content happen within a single critical code block section protected
    // by the sendLock. / (see sendFrame())
    private ByteBufferPool headerEncodingPool = new ByteBufferPool();

    private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
        ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
        ref.get().limit(maxFrameSize);
        return ref;
    }

    /*
     * Encodes all the headers from the given HttpHeaders into the given List
     * of buffers.
     *
     * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
     *
     *     ...Just as in HTTP/1.x, header field names are strings of ASCII
     *     characters that are compared in a case-insensitive fashion.  However,
     *     header field names MUST be converted to lowercase prior to their
     *     encoding in HTTP/2...
     */
    private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
        ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
        List<ByteBufferReference> buffers = new ArrayList<>();
        for(HttpHeaders header : headers) {
            for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
                String lKey = e.getKey().toLowerCase();
                List<String> values = e.getValue();
                for (String value : values) {
                    hpackOut.header(lKey, value);
                    while (!hpackOut.encode(buffer.get())) {
                        buffer.get().flip();
                        buffers.add(buffer);
                        buffer =  getHeaderBuffer(maxFrameSize);
                    }
                }
            }
        }
        buffer.get().flip();
        buffers.add(buffer);
        return buffers;
    }

    private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
        oh.streamid(stream.streamid);
        if (Log.headers()) {
            StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
            sb.append(stream.streamid).append(")\n");
            Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
            Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
            Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
            Log.logHeaders(sb.toString());
        }
        List<HeaderFrame> frames = encodeHeaders(oh);
        return encodeFrames(frames);
    }

    private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
        if (Log.frames()) {
            frames.forEach(f -> Log.logFrames(f, "OUT"));
        }
        return framesEncoder.encodeFrames(frames);
    }

    static Throwable getExceptionFrom(CompletableFuture<?> cf) {
        try {
            cf.get();
            return null;
        } catch (Throwable e) {
            if (e.getCause() != null) {
                return e.getCause();
            } else {
                return e;
            }
        }
    }

    private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
        Stream<?> stream = oh.getAttachment();
        int streamid = nextstreamid;
        nextstreamid += 2;
        stream.registerStream(streamid);
        // set outgoing window here. This allows thread sending
        // body to proceed.
        windowController.registerStream(streamid, getInitialSendWindowSize());
        return stream;
    }

    private final Object sendlock = new Object();

    void sendFrame(Http2Frame frame) {
        try {
            synchronized (sendlock) {
                if (frame instanceof OutgoingHeaders) {
                    @SuppressWarnings("unchecked")
                    OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
                    Stream<?> stream = registerNewStream(oh);
                    // provide protection from inserting unordered frames between Headers and Continuation
                    connection.writeAsync(encodeHeaders(oh, stream));
                } else {
                    connection.writeAsync(encodeFrame(frame));
                }
            }
            connection.flushAsync();
        } catch (IOException e) {
            if (!closed) {
                Log.logError(e);
                shutdown(e);
            }
        }
    }

    private ByteBufferReference[] encodeFrame(Http2Frame frame) {
        Log.logFrames(frame, "OUT");
        return framesEncoder.encodeFrame(frame);
    }

    void sendDataFrame(DataFrame frame) {
        try {
            connection.writeAsync(encodeFrame(frame));
            connection.flushAsync();
        } catch (IOException e) {
            if (!closed) {
                Log.logError(e);
                shutdown(e);
            }
        }
    }

    /*
     * Direct call of the method bypasses synchronization on "sendlock" and
     * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
     * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
     */
    void sendUnorderedFrame(Http2Frame frame) {
        try {
            connection.writeAsyncUnordered(encodeFrame(frame));
            connection.flushAsync();
        } catch (IOException e) {
            if (!closed) {
                Log.logError(e);
                shutdown(e);
            }
        }
    }

    static class HeaderDecoder implements DecodingCallback {
        HttpHeadersImpl headers;

        HeaderDecoder() {
            this.headers = new HttpHeadersImpl();
        }

        @Override
        public void onDecoded(CharSequence name, CharSequence value) {
            headers.addHeader(name.toString(), value.toString());
        }

        HttpHeadersImpl headers() {
            return headers;
        }
    }

    static final class ConnectionWindowUpdateSender extends WindowUpdateSender {

        public ConnectionWindowUpdateSender(Http2Connection connection,
                                            int initialWindowSize) {
            super(connection, initialWindowSize);
        }

        @Override
        int getStreamId() {
            return 0;
        }
    }

    /**
     * Thrown when https handshake negotiates http/1.1 alpn instead of h2
     */
    static final class ALPNException extends IOException {
        private static final long serialVersionUID = 23138275393635783L;
        final AbstractAsyncSSLConnection connection;

        ALPNException(String msg, AbstractAsyncSSLConnection connection) {
            super(msg);
            this.connection = connection;
        }

        AbstractAsyncSSLConnection getConnection() {
            return connection;
        }
    }
}