src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java	Tue Feb 06 19:37:56 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1180 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal;
-
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Flow;
-import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
-import jdk.incubator.http.HttpClient;
-import jdk.incubator.http.HttpHeaders;
-import jdk.incubator.http.HttpRequest;
-import jdk.incubator.http.HttpResponse;
-import jdk.incubator.http.HttpResponse.BodySubscriber;
-import jdk.incubator.http.internal.common.*;
-import jdk.incubator.http.internal.frame.*;
-import jdk.incubator.http.internal.hpack.DecodingCallback;
-
-/**
- * Http/2 Stream handling.
- *
- * REQUESTS
- *
- * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
- *
- * sendRequest() -- sendHeadersOnly() + sendBody()
- *
- * sendBodyAsync() -- calls sendBody() in an executor thread.
- *
- * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
- *
- * sendRequestAsync() -- calls sendRequest() in an executor thread
- *
- * RESPONSES
- *
- * Multiple responses can be received per request. Responses are queued up on
- * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
- * with the next response
- *
- * getResponseAsync() -- queries list of response CFs and returns first one
- *               if one exists. Otherwise, creates one and adds it to list
- *               and returns it. Completion is achieved through the
- *               incoming() upcall from connection reader thread.
- *
- * getResponse() -- calls getResponseAsync() and waits for CF to complete
- *
- * responseBodyAsync() -- calls responseBody() in an executor thread.
- *
- * incoming() -- entry point called from connection reader thread. Frames are
- *               either handled immediately without blocking or for data frames
- *               placed on the stream's inputQ which is consumed by the stream's
- *               reader thread.
- *
- * PushedStream sub class
- * ======================
- * Sending side methods are not used because the request comes from a PUSH_PROMISE
- * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
- * is created. PushedStream does not use responseCF list as there can be only
- * one response. The CF is created when the object created and when the response
- * HEADERS frame is received the object is completed.
- */
-class Stream<T> extends ExchangeImpl<T> {
-
-    final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
-    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
-
-    final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
-    final SequentialScheduler sched =
-            SequentialScheduler.synchronizedScheduler(this::schedule);
-    final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
-
-    /**
-     * This stream's identifier. Assigned lazily by the HTTP2Connection before
-     * the stream's first frame is sent.
-     */
-    protected volatile int streamid;
-
-    long requestContentLen;
-
-    final Http2Connection connection;
-    final HttpRequestImpl request;
-    final DecodingCallback rspHeadersConsumer;
-    HttpHeadersImpl responseHeaders;
-    final HttpHeadersImpl requestPseudoHeaders;
-    volatile HttpResponse.BodySubscriber<T> responseSubscriber;
-    final HttpRequest.BodyPublisher requestPublisher;
-    volatile RequestSubscriber requestSubscriber;
-    volatile int responseCode;
-    volatile Response response;
-    volatile Throwable failed; // The exception with which this stream was canceled.
-    final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
-    volatile CompletableFuture<T> responseBodyCF;
-
-    /** True if END_STREAM has been seen in a frame received on this stream. */
-    private volatile boolean remotelyClosed;
-    private volatile boolean closed;
-    private volatile boolean endStreamSent;
-
-    // state flags
-    private boolean requestSent, responseReceived;
-
-    /**
-     * A reference to this Stream's connection Send Window controller. The
-     * stream MUST acquire the appropriate amount of Send Window before
-     * sending any data. Will be null for PushStreams, as they cannot send data.
-     */
-    private final WindowController windowController;
-    private final WindowUpdateSender windowUpdater;
-
-    @Override
-    HttpConnection connection() {
-        return connection.connection;
-    }
-
-    /**
-     * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
-     * of after user subscription window has re-opened, from SubscriptionBase.request()
-     */
-    private void schedule() {
-        if (responseSubscriber == null)
-            // can't process anything yet
-            return;
-
-        try {
-            while (!inputQ.isEmpty()) {
-                Http2Frame frame = inputQ.peek();
-                if (frame instanceof ResetFrame) {
-                    inputQ.remove();
-                    handleReset((ResetFrame)frame);
-                    return;
-                }
-                DataFrame df = (DataFrame)frame;
-                boolean finished = df.getFlag(DataFrame.END_STREAM);
-
-                List<ByteBuffer> buffers = df.getData();
-                List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
-                int size = Utils.remaining(dsts, Integer.MAX_VALUE);
-                if (size == 0 && finished) {
-                    inputQ.remove();
-                    Log.logTrace("responseSubscriber.onComplete");
-                    debug.log(Level.DEBUG, "incoming: onComplete");
-                    sched.stop();
-                    responseSubscriber.onComplete();
-                    setEndStreamReceived();
-                    return;
-                } else if (userSubscription.tryDecrement()) {
-                    inputQ.remove();
-                    Log.logTrace("responseSubscriber.onNext {0}", size);
-                    debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
-                    responseSubscriber.onNext(dsts);
-                    if (consumed(df)) {
-                        Log.logTrace("responseSubscriber.onComplete");
-                        debug.log(Level.DEBUG, "incoming: onComplete");
-                        sched.stop();
-                        responseSubscriber.onComplete();
-                        setEndStreamReceived();
-                        return;
-                    }
-                } else {
-                    return;
-                }
-            }
-        } catch (Throwable throwable) {
-            failed = throwable;
-        }
-
-        Throwable t = failed;
-        if (t != null) {
-            sched.stop();
-            responseSubscriber.onError(t);
-            close();
-        }
-    }
-
-    // Callback invoked after the Response BodySubscriber has consumed the
-    // buffers contained in a DataFrame.
-    // Returns true if END_STREAM is reached, false otherwise.
-    private boolean consumed(DataFrame df) {
-        // RFC 7540 6.1:
-        // The entire DATA frame payload is included in flow control,
-        // including the Pad Length and Padding fields if present
-        int len = df.payloadLength();
-        connection.windowUpdater.update(len);
-
-        if (!df.getFlag(DataFrame.END_STREAM)) {
-            // Don't send window update on a stream which is
-            // closed or half closed.
-            windowUpdater.update(len);
-            return false; // more data coming
-        }
-        return true; // end of stream
-    }
-
-    @Override
-    CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
-                                       boolean returnConnectionToPool,
-                                       Executor executor)
-    {
-        Log.logTrace("Reading body on stream {0}", streamid);
-        BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
-
-        PushGroup<?> pg = exchange.getPushGroup();
-        if (pg != null) {
-            // if an error occurs make sure it is recorded in the PushGroup
-            cf = cf.whenComplete((t,e) -> pg.pushError(e));
-        }
-        return cf;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("streamid: ")
-                .append(streamid);
-        return sb.toString();
-    }
-
-    private void receiveDataFrame(DataFrame df) {
-        inputQ.add(df);
-        sched.runOrSchedule();
-    }
-
-    /** Handles a RESET frame. RESET is always handled inline in the queue. */
-    private void receiveResetFrame(ResetFrame frame) {
-        inputQ.add(frame);
-        sched.runOrSchedule();
-    }
-
-    // pushes entire response body into response subscriber
-    // blocking when required by local or remote flow control
-    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
-        responseBodyCF = new MinimalFuture<>();
-        // We want to allow the subscriber's getBody() method to block so it
-        // can work with InputStreams. So, we offload execution.
-        executor.execute(() -> {
-            bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
-                if (t == null)
-                    responseBodyCF.complete(body);
-                else
-                    responseBodyCF.completeExceptionally(t);
-            });
-        });
-
-        if (isCanceled()) {
-            Throwable t = getCancelCause();
-            responseBodyCF.completeExceptionally(t);
-        } else {
-            bodySubscriber.onSubscribe(userSubscription);
-        }
-        // Set the responseSubscriber field now that onSubscribe has been called.
-        // This effectively allows the scheduler to start invoking the callbacks.
-        responseSubscriber = bodySubscriber;
-        sched.runOrSchedule(); // in case data waiting already to be processed
-        return responseBodyCF;
-    }
-
-    @Override
-    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
-        return sendBodyImpl().thenApply( v -> this);
-    }
-
-    @SuppressWarnings("unchecked")
-    Stream(Http2Connection connection,
-           Exchange<T> e,
-           WindowController windowController)
-    {
-        super(e);
-        this.connection = connection;
-        this.windowController = windowController;
-        this.request = e.request();
-        this.requestPublisher = request.requestPublisher;  // may be null
-        responseHeaders = new HttpHeadersImpl();
-        rspHeadersConsumer = (name, value) -> {
-            responseHeaders.addHeader(name.toString(), value.toString());
-            if (Log.headers() && Log.trace()) {
-                Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
-                             streamid, name, value);
-            }
-        };
-        this.requestPseudoHeaders = new HttpHeadersImpl();
-        // NEW
-        this.windowUpdater = new StreamWindowUpdateSender(connection);
-    }
-
-    /**
-     * Entry point from Http2Connection reader thread.
-     *
-     * Data frames will be removed by response body thread.
-     */
-    void incoming(Http2Frame frame) throws IOException {
-        debug.log(Level.DEBUG, "incoming: %s", frame);
-        if ((frame instanceof HeaderFrame)) {
-            HeaderFrame hframe = (HeaderFrame)frame;
-            if (hframe.endHeaders()) {
-                Log.logTrace("handling response (streamid={0})", streamid);
-                handleResponse();
-                if (hframe.getFlag(HeaderFrame.END_STREAM)) {
-                    receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
-                }
-            }
-        } else if (frame instanceof DataFrame) {
-            receiveDataFrame((DataFrame)frame);
-        } else {
-            otherFrame(frame);
-        }
-    }
-
-    void otherFrame(Http2Frame frame) throws IOException {
-        switch (frame.type()) {
-            case WindowUpdateFrame.TYPE:
-                incoming_windowUpdate((WindowUpdateFrame) frame);
-                break;
-            case ResetFrame.TYPE:
-                incoming_reset((ResetFrame) frame);
-                break;
-            case PriorityFrame.TYPE:
-                incoming_priority((PriorityFrame) frame);
-                break;
-            default:
-                String msg = "Unexpected frame: " + frame.toString();
-                throw new IOException(msg);
-        }
-    }
-
-    // The Hpack decoder decodes into one of these consumers of name,value pairs
-
-    DecodingCallback rspHeadersConsumer() {
-        return rspHeadersConsumer;
-    }
-
-    protected void handleResponse() throws IOException {
-        responseCode = (int)responseHeaders
-                .firstValueAsLong(":status")
-                .orElseThrow(() -> new IOException("no statuscode in response"));
-
-        response = new Response(
-                request, exchange, responseHeaders,
-                responseCode, HttpClient.Version.HTTP_2);
-
-        /* TODO: review if needs to be removed
-           the value is not used, but in case `content-length` doesn't parse as
-           long, there will be NumberFormatException. If left as is, make sure
-           code up the stack handles NFE correctly. */
-        responseHeaders.firstValueAsLong("content-length");
-
-        if (Log.headers()) {
-            StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
-            Log.dumpHeaders(sb, "    ", responseHeaders);
-            Log.logHeaders(sb.toString());
-        }
-
-        completeResponse(response);
-    }
-
-    void incoming_reset(ResetFrame frame) {
-        Log.logTrace("Received RST_STREAM on stream {0}", streamid);
-        if (endStreamReceived()) {
-            Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
-        } else if (closed) {
-            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
-        } else {
-            // put it in the input queue in order to read all
-            // pending data frames first. Indeed, a server may send
-            // RST_STREAM after sending END_STREAM, in which case we should
-            // ignore it. However, we won't know if we have received END_STREAM
-            // or not until all pending data frames are read.
-            receiveResetFrame(frame);
-            // RST_STREAM was pushed to the queue. It will be handled by
-            // asyncReceive after all pending data frames have been
-            // processed.
-            Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
-        }
-    }
-
-    void handleReset(ResetFrame frame) {
-        Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
-        if (!closed) {
-            close();
-            int error = frame.getErrorCode();
-            completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
-        } else {
-            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
-        }
-    }
-
-    void incoming_priority(PriorityFrame frame) {
-        // TODO: implement priority
-        throw new UnsupportedOperationException("Not implemented");
-    }
-
-    private void incoming_windowUpdate(WindowUpdateFrame frame)
-        throws IOException
-    {
-        int amount = frame.getUpdate();
-        if (amount <= 0) {
-            Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
-                         streamid, streamid, amount);
-            connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
-        } else {
-            assert streamid != 0;
-            boolean success = windowController.increaseStreamWindow(amount, streamid);
-            if (!success) {  // overflow
-                connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
-            }
-        }
-    }
-
-    void incoming_pushPromise(HttpRequestImpl pushRequest,
-                              PushedStream<T> pushStream)
-        throws IOException
-    {
-        if (Log.requests()) {
-            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
-        }
-        PushGroup<T> pushGroup = exchange.getPushGroup();
-        if (pushGroup == null) {
-            Log.logTrace("Rejecting push promise stream " + streamid);
-            connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
-            pushStream.close();
-            return;
-        }
-
-        PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
-
-        if (!acceptor.accepted()) {
-            // cancel / reject
-            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
-            if (Log.trace()) {
-                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
-                        ex.getMessage());
-            }
-            pushStream.cancelImpl(ex);
-            return;
-        }
-
-        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
-        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
-        assert pushHandler != null;
-
-        pushStream.requestSent();
-        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
-        // setup housekeeping for when the push is received
-        // TODO: deal with ignoring of CF anti-pattern
-        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-        cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
-            t = Utils.getCompletionCause(t);
-            if (Log.trace()) {
-                Log.logTrace("Push completed on stream {0} for {1}{2}",
-                             pushStream.streamid, resp,
-                             ((t==null) ? "": " with exception " + t));
-            }
-            if (t != null) {
-                pushGroup.pushError(t);
-                pushResponseCF.completeExceptionally(t);
-            } else {
-                pushResponseCF.complete(resp);
-            }
-            pushGroup.pushCompleted();
-        });
-
-    }
-
-    private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
-        HttpHeadersImpl h = request.getSystemHeaders();
-        if (contentLength > 0) {
-            h.setHeader("content-length", Long.toString(contentLength));
-        }
-        setPseudoHeaderFields();
-        HttpHeaders sysh = filter(h);
-        HttpHeaders userh = filter(request.getUserHeaders());
-        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
-        if (contentLength == 0) {
-            f.setFlag(HeadersFrame.END_STREAM);
-            endStreamSent = true;
-        }
-        return f;
-    }
-
-    private boolean hasProxyAuthorization(HttpHeaders headers) {
-        return headers.firstValue("proxy-authorization")
-                      .isPresent();
-    }
-
-    // Determines whether we need to build a new HttpHeader object.
-    //
-    // Ideally we should pass the filter to OutgoingHeaders refactor the
-    // code that creates the HeaderFrame to honor the filter.
-    // We're not there yet - so depending on the filter we need to
-    // apply and the content of the header we will try to determine
-    //  whether anything might need to be filtered.
-    // If nothing needs filtering then we can just use the
-    // original headers.
-    private boolean needsFiltering(HttpHeaders headers,
-                                   BiPredicate<String, List<String>> filter) {
-        if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
-            // we're either connecting or proxying
-            // slight optimization: we only need to filter out
-            // disabled schemes, so if there are none just
-            // pass through.
-            return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
-                    && hasProxyAuthorization(headers);
-        } else {
-            // we're talking to a server, either directly or through
-            // a tunnel.
-            // Slight optimization: we only need to filter out
-            // proxy authorization headers, so if there are none just
-            // pass through.
-            return hasProxyAuthorization(headers);
-        }
-    }
-
-    private HttpHeaders filter(HttpHeaders headers) {
-        HttpConnection conn = connection();
-        BiPredicate<String, List<String>> filter =
-                conn.headerFilter(request);
-        if (needsFiltering(headers, filter)) {
-            return ImmutableHeaders.of(headers.map(), filter);
-        }
-        return headers;
-    }
-
-    private void setPseudoHeaderFields() {
-        HttpHeadersImpl hdrs = requestPseudoHeaders;
-        String method = request.method();
-        hdrs.setHeader(":method", method);
-        URI uri = request.uri();
-        hdrs.setHeader(":scheme", uri.getScheme());
-        // TODO: userinfo deprecated. Needs to be removed
-        hdrs.setHeader(":authority", uri.getAuthority());
-        // TODO: ensure header names beginning with : not in user headers
-        String query = uri.getQuery();
-        String path = uri.getPath();
-        if (path == null || path.isEmpty()) {
-            if (method.equalsIgnoreCase("OPTIONS")) {
-                path = "*";
-            } else {
-                path = "/";
-            }
-        }
-        if (query != null) {
-            path += "?" + query;
-        }
-        hdrs.setHeader(":path", path);
-    }
-
-    HttpHeadersImpl getRequestPseudoHeaders() {
-        return requestPseudoHeaders;
-    }
-
-    /** Sets endStreamReceived. Should be called only once. */
-    void setEndStreamReceived() {
-        assert remotelyClosed == false: "Unexpected endStream already set";
-        remotelyClosed = true;
-        responseReceived();
-    }
-
-    /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
-     *  received on this stream. */
-    private boolean endStreamReceived() {
-        return remotelyClosed;
-    }
-
-    @Override
-    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
-        debug.log(Level.DEBUG, "sendHeadersOnly()");
-        if (Log.requests() && request != null) {
-            Log.logRequest(request.toString());
-        }
-        if (requestPublisher != null) {
-            requestContentLen = requestPublisher.contentLength();
-        } else {
-            requestContentLen = 0;
-        }
-        OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
-        connection.sendFrame(f);
-        CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
-        cf.complete(this);  // #### good enough for now
-        return cf;
-    }
-
-    @Override
-    void released() {
-        if (streamid > 0) {
-            debug.log(Level.DEBUG, "Released stream %d", streamid);
-            // remove this stream from the Http2Connection map.
-            connection.closeStream(streamid);
-        } else {
-            debug.log(Level.DEBUG, "Can't release stream %d", streamid);
-        }
-    }
-
-    @Override
-    void completed() {
-        // There should be nothing to do here: the stream should have
-        // been already closed (or will be closed shortly after).
-    }
-
-    void registerStream(int id) {
-        this.streamid = id;
-        connection.putStream(this, streamid);
-        debug.log(Level.DEBUG, "Registered stream %d", id);
-    }
-
-    void signalWindowUpdate() {
-        RequestSubscriber subscriber = requestSubscriber;
-        assert subscriber != null;
-        debug.log(Level.DEBUG, "Signalling window update");
-        subscriber.sendScheduler.runOrSchedule();
-    }
-
-    static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
-    class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
-        // can be < 0 if the actual length is not known.
-        private final long contentLength;
-        private volatile long remainingContentLength;
-        private volatile Subscription subscription;
-
-        // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
-        //  1) The data that was published by the request body Publisher, and
-        //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
-        final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
-
-        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-        // A scheduler used to honor window updates. Writing must be paused
-        // when the window is exhausted, and resumed when the window acquires
-        // some space. The sendScheduler makes it possible to implement this
-        // behaviour in an asynchronous non-blocking way.
-        // See RequestSubscriber::trySend below.
-        final SequentialScheduler sendScheduler;
-
-        RequestSubscriber(long contentLen) {
-            this.contentLength = contentLen;
-            this.remainingContentLength = contentLen;
-            this.sendScheduler =
-                    SequentialScheduler.synchronizedScheduler(this::trySend);
-        }
-
-        @Override
-        public void onSubscribe(Flow.Subscription subscription) {
-            if (this.subscription != null) {
-                throw new IllegalStateException("already subscribed");
-            }
-            this.subscription = subscription;
-            debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
-            subscription.request(1);
-        }
-
-        @Override
-        public void onNext(ByteBuffer item) {
-            debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
-            int size = outgoing.size();
-            assert size == 0 : "non-zero size: " + size;
-            onNextImpl(item);
-        }
-
-        private void onNextImpl(ByteBuffer item) {
-            // Got some more request body bytes to send.
-            if (requestBodyCF.isDone()) {
-                // stream already cancelled, probably in timeout
-                sendScheduler.stop();
-                subscription.cancel();
-                return;
-            }
-            outgoing.add(item);
-            sendScheduler.runOrSchedule();
-        }
-
-        @Override
-        public void onError(Throwable throwable) {
-            debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
-            // ensure that errors are handled within the flow.
-            if (errorRef.compareAndSet(null, throwable)) {
-                sendScheduler.runOrSchedule();
-            }
-        }
-
-        @Override
-        public void onComplete() {
-            debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
-            int size = outgoing.size();
-            assert size == 0 || size == 1 : "non-zero or one size: " + size;
-            // last byte of request body has been obtained.
-            // ensure that everything is completed within the flow.
-            onNextImpl(COMPLETED);
-        }
-
-        // Attempts to send the data, if any.
-        // Handles errors and completion state.
-        // Pause writing if the send window is exhausted, resume it if the
-        // send window has some bytes that can be acquired.
-        void trySend() {
-            try {
-                // handle errors raised by onError;
-                Throwable t = errorRef.get();
-                if (t != null) {
-                    sendScheduler.stop();
-                    if (requestBodyCF.isDone()) return;
-                    subscription.cancel();
-                    requestBodyCF.completeExceptionally(t);
-                    return;
-                }
-
-                do {
-                    // handle COMPLETED;
-                    ByteBuffer item = outgoing.peekFirst();
-                    if (item == null) return;
-                    else if (item == COMPLETED) {
-                        sendScheduler.stop();
-                        complete();
-                        return;
-                    }
-
-                    // handle bytes to send downstream
-                    while (item.hasRemaining()) {
-                        debug.log(Level.DEBUG, "trySend: %d", item.remaining());
-                        assert !endStreamSent : "internal error, send data after END_STREAM flag";
-                        DataFrame df = getDataFrame(item);
-                        if (df == null) {
-                            debug.log(Level.DEBUG, "trySend: can't send yet: %d",
-                                    item.remaining());
-                            return; // the send window is exhausted: come back later
-                        }
-
-                        if (contentLength > 0) {
-                            remainingContentLength -= df.getDataLength();
-                            if (remainingContentLength < 0) {
-                                String msg = connection().getConnectionFlow()
-                                        + " stream=" + streamid + " "
-                                        + "[" + Thread.currentThread().getName() + "] "
-                                        + "Too many bytes in request body. Expected: "
-                                        + contentLength + ", got: "
-                                        + (contentLength - remainingContentLength);
-                                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
-                                throw new IOException(msg);
-                            } else if (remainingContentLength == 0) {
-                                df.setFlag(DataFrame.END_STREAM);
-                                endStreamSent = true;
-                            }
-                        }
-                        debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
-                        connection.sendDataFrame(df);
-                    }
-                    assert !item.hasRemaining();
-                    ByteBuffer b = outgoing.removeFirst();
-                    assert b == item;
-                } while (outgoing.peekFirst() != null);
-
-                debug.log(Level.DEBUG, "trySend: request 1");
-                subscription.request(1);
-            } catch (Throwable ex) {
-                debug.log(Level.DEBUG, "trySend: ", ex);
-                sendScheduler.stop();
-                subscription.cancel();
-                requestBodyCF.completeExceptionally(ex);
-            }
-        }
-
-        private void complete() throws IOException {
-            long remaining = remainingContentLength;
-            long written = contentLength - remaining;
-            if (remaining > 0) {
-                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
-                // let trySend() handle the exception
-                throw new IOException(connection().getConnectionFlow()
-                                     + " stream=" + streamid + " "
-                                     + "[" + Thread.currentThread().getName() +"] "
-                                     + "Too few bytes returned by the publisher ("
-                                              + written + "/"
-                                              + contentLength + ")");
-            }
-            if (!endStreamSent) {
-                endStreamSent = true;
-                connection.sendDataFrame(getEmptyEndStreamDataFrame());
-            }
-            requestBodyCF.complete(null);
-        }
-    }
-
-    /**
-     * Send a RESET frame to tell server to stop sending data on this stream
-     */
-    @Override
-    public CompletableFuture<Void> ignoreBody() {
-        try {
-            connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
-            return MinimalFuture.completedFuture(null);
-        } catch (Throwable e) {
-            Log.logTrace("Error resetting stream {0}", e.toString());
-            return MinimalFuture.failedFuture(e);
-        }
-    }
-
-    DataFrame getDataFrame(ByteBuffer buffer) {
-        int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
-        // blocks waiting for stream send window, if exhausted
-        int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
-        if (actualAmount <= 0) return null;
-        ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
-        DataFrame df = new DataFrame(streamid, 0 , outBuf);
-        return df;
-    }
-
-    private DataFrame getEmptyEndStreamDataFrame()  {
-        return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
-    }
-
-    /**
-     * A List of responses relating to this stream. Normally there is only
-     * one response, but intermediate responses like 100 are allowed
-     * and must be passed up to higher level before continuing. Deals with races
-     * such as if responses are returned before the CFs get created by
-     * getResponseAsync()
-     */
-
-    final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
-
-    @Override
-    CompletableFuture<Response> getResponseAsync(Executor executor) {
-        CompletableFuture<Response> cf;
-        // The code below deals with race condition that can be caused when
-        // completeResponse() is being called before getResponseAsync()
-        synchronized (response_cfs) {
-            if (!response_cfs.isEmpty()) {
-                // This CompletableFuture was created by completeResponse().
-                // it will be already completed.
-                cf = response_cfs.remove(0);
-                // if we find a cf here it should be already completed.
-                // finding a non completed cf should not happen. just assert it.
-                assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
-            } else {
-                // getResponseAsync() is called first. Create a CompletableFuture
-                // that will be completed by completeResponse() when
-                // completeResponse() is called.
-                cf = new MinimalFuture<>();
-                response_cfs.add(cf);
-            }
-        }
-        if (executor != null && !cf.isDone()) {
-            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
-            cf = cf.thenApplyAsync(r -> r, executor);
-        }
-        Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
-        PushGroup<?> pg = exchange.getPushGroup();
-        if (pg != null) {
-            // if an error occurs make sure it is recorded in the PushGroup
-            cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
-        }
-        return cf;
-    }
-
-    /**
-     * Completes the first uncompleted CF on list, and removes it. If there is no
-     * uncompleted CF then creates one (completes it) and adds to list
-     */
-    void completeResponse(Response resp) {
-        synchronized (response_cfs) {
-            CompletableFuture<Response> cf;
-            int cfs_len = response_cfs.size();
-            for (int i=0; i<cfs_len; i++) {
-                cf = response_cfs.get(i);
-                if (!cf.isDone()) {
-                    Log.logTrace("Completing response (streamid={0}): {1}",
-                                 streamid, cf);
-                    cf.complete(resp);
-                    response_cfs.remove(cf);
-                    return;
-                } // else we found the previous response: just leave it alone.
-            }
-            cf = MinimalFuture.completedFuture(resp);
-            Log.logTrace("Created completed future (streamid={0}): {1}",
-                         streamid, cf);
-            response_cfs.add(cf);
-        }
-    }
-
-    // methods to update state and remove stream when finished
-
-    synchronized void requestSent() {
-        requestSent = true;
-        if (responseReceived) {
-            close();
-        }
-    }
-
-    synchronized void responseReceived() {
-        responseReceived = true;
-        if (requestSent) {
-            close();
-        }
-    }
-
-    /**
-     * same as above but for errors
-     */
-    void completeResponseExceptionally(Throwable t) {
-        synchronized (response_cfs) {
-            // use index to avoid ConcurrentModificationException
-            // caused by removing the CF from within the loop.
-            for (int i = 0; i < response_cfs.size(); i++) {
-                CompletableFuture<Response> cf = response_cfs.get(i);
-                if (!cf.isDone()) {
-                    cf.completeExceptionally(t);
-                    response_cfs.remove(i);
-                    return;
-                }
-            }
-            response_cfs.add(MinimalFuture.failedFuture(t));
-        }
-    }
-
-    CompletableFuture<Void> sendBodyImpl() {
-        requestBodyCF.whenComplete((v, t) -> requestSent());
-        if (requestPublisher != null) {
-            final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
-            requestPublisher.subscribe(requestSubscriber = subscriber);
-        } else {
-            // there is no request body, therefore the request is complete,
-            // END_STREAM has already sent with outgoing headers
-            requestBodyCF.complete(null);
-        }
-        return requestBodyCF;
-    }
-
-    @Override
-    void cancel() {
-        cancel(new IOException("Stream " + streamid + " cancelled"));
-    }
-
-    @Override
-    void cancel(IOException cause) {
-        cancelImpl(cause);
-    }
-
-    // This method sends a RST_STREAM frame
-    void cancelImpl(Throwable e) {
-        debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
-        if (Log.trace()) {
-            Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
-        }
-        boolean closing;
-        if (closing = !closed) { // assigning closing to !closed
-            synchronized (this) {
-                failed = e;
-                if (closing = !closed) { // assigning closing to !closed
-                    closed=true;
-                }
-            }
-        }
-        if (closing) { // true if the stream has not been closed yet
-            if (responseSubscriber != null)
-                sched.runOrSchedule();
-        }
-        completeResponseExceptionally(e);
-        if (!requestBodyCF.isDone()) {
-            requestBodyCF.completeExceptionally(e); // we may be sending the body..
-        }
-        if (responseBodyCF != null) {
-            responseBodyCF.completeExceptionally(e);
-        }
-        try {
-            // will send a RST_STREAM frame
-            if (streamid != 0) {
-                connection.resetStream(streamid, ResetFrame.CANCEL);
-            }
-        } catch (IOException ex) {
-            Log.logError(ex);
-        }
-    }
-
-    // This method doesn't send any frame
-    void close() {
-        if (closed) return;
-        synchronized(this) {
-            if (closed) return;
-            closed = true;
-        }
-        Log.logTrace("Closing stream {0}", streamid);
-        connection.closeStream(streamid);
-        Log.logTrace("Stream {0} closed", streamid);
-    }
-
-    static class PushedStream<T> extends Stream<T> {
-        final PushGroup<T> pushGroup;
-        // push streams need the response CF allocated up front as it is
-        // given directly to user via the multi handler callback function.
-        final CompletableFuture<Response> pushCF;
-        CompletableFuture<HttpResponse<T>> responseCF;
-        final HttpRequestImpl pushReq;
-        HttpResponse.BodyHandler<T> pushHandler;
-
-        PushedStream(PushGroup<T> pushGroup,
-                     Http2Connection connection,
-                     Exchange<T> pushReq) {
-            // ## no request body possible, null window controller
-            super(connection, pushReq, null);
-            this.pushGroup = pushGroup;
-            this.pushReq = pushReq.request();
-            this.pushCF = new MinimalFuture<>();
-            this.responseCF = new MinimalFuture<>();
-
-        }
-
-        CompletableFuture<HttpResponse<T>> responseCF() {
-            return responseCF;
-        }
-
-        synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
-            this.pushHandler = pushHandler;
-        }
-
-        synchronized HttpResponse.BodyHandler<T> getPushHandler() {
-            // ignored parameters to function can be used as BodyHandler
-            return this.pushHandler;
-        }
-
-        // Following methods call the super class but in case of
-        // error record it in the PushGroup. The error method is called
-        // with a null value when no error occurred (is a no-op)
-        @Override
-        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
-            return super.sendBodyAsync()
-                        .whenComplete((ExchangeImpl<T> v, Throwable t)
-                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
-        }
-
-        @Override
-        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
-            return super.sendHeadersAsync()
-                        .whenComplete((ExchangeImpl<T> ex, Throwable t)
-                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
-        }
-
-        @Override
-        CompletableFuture<Response> getResponseAsync(Executor executor) {
-            CompletableFuture<Response> cf = pushCF.whenComplete(
-                    (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
-            if(executor!=null && !cf.isDone()) {
-                cf  = cf.thenApplyAsync( r -> r, executor);
-            }
-            return cf;
-        }
-
-        @Override
-        CompletableFuture<T> readBodyAsync(
-                HttpResponse.BodyHandler<T> handler,
-                boolean returnConnectionToPool,
-                Executor executor)
-        {
-            return super.readBodyAsync(handler, returnConnectionToPool, executor)
-                        .whenComplete((v, t) -> pushGroup.pushError(t));
-        }
-
-        @Override
-        void completeResponse(Response r) {
-            Log.logResponse(r::toString);
-            pushCF.complete(r); // not strictly required for push API
-            // start reading the body using the obtained BodySubscriber
-            CompletableFuture<Void> start = new MinimalFuture<>();
-            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
-                .whenComplete((T body, Throwable t) -> {
-                    if (t != null) {
-                        responseCF.completeExceptionally(t);
-                    } else {
-                        HttpResponseImpl<T> resp =
-                                new HttpResponseImpl<>(r.request, r, null, body, getExchange());
-                        responseCF.complete(resp);
-                    }
-                });
-            start.completeAsync(() -> null, getExchange().executor());
-        }
-
-        @Override
-        void completeResponseExceptionally(Throwable t) {
-            pushCF.completeExceptionally(t);
-        }
-
-//        @Override
-//        synchronized void responseReceived() {
-//            super.responseReceived();
-//        }
-
-        // create and return the PushResponseImpl
-        @Override
-        protected void handleResponse() {
-            responseCode = (int)responseHeaders
-                .firstValueAsLong(":status")
-                .orElse(-1);
-
-            if (responseCode == -1) {
-                completeResponseExceptionally(new IOException("No status code"));
-            }
-
-            this.response = new Response(
-                pushReq, exchange, responseHeaders,
-                responseCode, HttpClient.Version.HTTP_2);
-
-            /* TODO: review if needs to be removed
-               the value is not used, but in case `content-length` doesn't parse
-               as long, there will be NumberFormatException. If left as is, make
-               sure code up the stack handles NFE correctly. */
-            responseHeaders.firstValueAsLong("content-length");
-
-            if (Log.headers()) {
-                StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
-                sb.append(" (streamid=").append(streamid).append("): ");
-                Log.dumpHeaders(sb, "    ", responseHeaders);
-                Log.logHeaders(sb.toString());
-            }
-
-            // different implementations for normal streams and pushed streams
-            completeResponse(response);
-        }
-    }
-
-    final class StreamWindowUpdateSender extends WindowUpdateSender {
-
-        StreamWindowUpdateSender(Http2Connection connection) {
-            super(connection);
-        }
-
-        @Override
-        int getStreamId() {
-            return streamid;
-        }
-    }
-
-    /**
-     * Returns true if this exchange was canceled.
-     * @return true if this exchange was canceled.
-     */
-    synchronized boolean isCanceled() {
-        return failed != null;
-    }
-
-    /**
-     * Returns the cause for which this exchange was canceled, if available.
-     * @return the cause for which this exchange was canceled, if available.
-     */
-    synchronized Throwable getCancelCause() {
-        return failed;
-    }
-
-    final String dbgString() {
-        return connection.dbgString() + "/Stream("+streamid+")";
-    }
-}