src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
changeset 49765 ee6f7a61f3a5
parent 48703 3eae36c6baa5
child 49944 4690a2871b44
child 56451 9585061fdb04
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Tue Apr 17 08:54:17 2018 -0700
@@ -0,0 +1,1272 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.System.Logger.Level;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodySubscriber;
+import jdk.internal.net.http.common.*;
+import jdk.internal.net.http.frame.*;
+import jdk.internal.net.http.hpack.DecodingCallback;
+
+/**
+ * Http/2 Stream handling.
+ *
+ * REQUESTS
+ *
+ * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
+ *
+ * sendRequest() -- sendHeadersOnly() + sendBody()
+ *
+ * sendBodyAsync() -- calls sendBody() in an executor thread.
+ *
+ * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
+ *
+ * sendRequestAsync() -- calls sendRequest() in an executor thread
+ *
+ * RESPONSES
+ *
+ * Multiple responses can be received per request. Responses are queued up on
+ * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
+ * with the next response
+ *
+ * getResponseAsync() -- queries list of response CFs and returns first one
+ *               if one exists. Otherwise, creates one and adds it to list
+ *               and returns it. Completion is achieved through the
+ *               incoming() upcall from connection reader thread.
+ *
+ * getResponse() -- calls getResponseAsync() and waits for CF to complete
+ *
+ * responseBodyAsync() -- calls responseBody() in an executor thread.
+ *
+ * incoming() -- entry point called from connection reader thread. Frames are
+ *               either handled immediately without blocking or for data frames
+ *               placed on the stream's inputQ which is consumed by the stream's
+ *               reader thread.
+ *
+ * PushedStream sub class
+ * ======================
+ * Sending side methods are not used because the request comes from a PUSH_PROMISE
+ * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
+ * is created. PushedStream does not use responseCF list as there can be only
+ * one response. The CF is created when the object created and when the response
+ * HEADERS frame is received the object is completed.
+ */
+class Stream<T> extends ExchangeImpl<T> {
+
+    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
+
+    final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
+    final SequentialScheduler sched =
+            SequentialScheduler.synchronizedScheduler(this::schedule);
+    final SubscriptionBase userSubscription =
+            new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
+
+    /**
+     * This stream's identifier. Assigned lazily by the HTTP2Connection before
+     * the stream's first frame is sent.
+     */
+    protected volatile int streamid;
+
+    long requestContentLen;
+
+    final Http2Connection connection;
+    final HttpRequestImpl request;
+    final HeadersConsumer rspHeadersConsumer;
+    final HttpHeadersImpl responseHeaders;
+    final HttpHeadersImpl requestPseudoHeaders;
+    volatile HttpResponse.BodySubscriber<T> responseSubscriber;
+    final HttpRequest.BodyPublisher requestPublisher;
+    volatile RequestSubscriber requestSubscriber;
+    volatile int responseCode;
+    volatile Response response;
+    // The exception with which this stream was canceled.
+    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+    final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
+    volatile CompletableFuture<T> responseBodyCF;
+    volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
+    volatile boolean stopRequested;
+
+    /** True if END_STREAM has been seen in a frame received on this stream. */
+    private volatile boolean remotelyClosed;
+    private volatile boolean closed;
+    private volatile boolean endStreamSent;
+
+    // state flags
+    private boolean requestSent, responseReceived;
+
+    /**
+     * A reference to this Stream's connection Send Window controller. The
+     * stream MUST acquire the appropriate amount of Send Window before
+     * sending any data. Will be null for PushStreams, as they cannot send data.
+     */
+    private final WindowController windowController;
+    private final WindowUpdateSender windowUpdater;
+
+    @Override
+    HttpConnection connection() {
+        return connection.connection;
+    }
+
+    /**
+     * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
+     * of after user subscription window has re-opened, from SubscriptionBase.request()
+     */
+    private void schedule() {
+        boolean onCompleteCalled = false;
+        HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
+        try {
+            if (subscriber == null) {
+                subscriber = responseSubscriber = pendingResponseSubscriber;
+                if (subscriber == null) {
+                    // can't process anything yet
+                    return;
+                } else {
+                    if (debug.on()) debug.log("subscribing user subscriber");
+                    subscriber.onSubscribe(userSubscription);
+                }
+            }
+            while (!inputQ.isEmpty()) {
+                Http2Frame frame = inputQ.peek();
+                if (frame instanceof ResetFrame) {
+                    inputQ.remove();
+                    handleReset((ResetFrame)frame);
+                    return;
+                }
+                DataFrame df = (DataFrame)frame;
+                boolean finished = df.getFlag(DataFrame.END_STREAM);
+
+                List<ByteBuffer> buffers = df.getData();
+                List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
+                int size = Utils.remaining(dsts, Integer.MAX_VALUE);
+                if (size == 0 && finished) {
+                    inputQ.remove();
+                    Log.logTrace("responseSubscriber.onComplete");
+                    if (debug.on()) debug.log("incoming: onComplete");
+                    sched.stop();
+                    subscriber.onComplete();
+                    onCompleteCalled = true;
+                    setEndStreamReceived();
+                    return;
+                } else if (userSubscription.tryDecrement()) {
+                    inputQ.remove();
+                    Log.logTrace("responseSubscriber.onNext {0}", size);
+                    if (debug.on()) debug.log("incoming: onNext(%d)", size);
+                    subscriber.onNext(dsts);
+                    if (consumed(df)) {
+                        Log.logTrace("responseSubscriber.onComplete");
+                        if (debug.on()) debug.log("incoming: onComplete");
+                        sched.stop();
+                        subscriber.onComplete();
+                        onCompleteCalled = true;
+                        setEndStreamReceived();
+                        return;
+                    }
+                } else {
+                    if (stopRequested) break;
+                    return;
+                }
+            }
+        } catch (Throwable throwable) {
+            errorRef.compareAndSet(null, throwable);
+        }
+
+        Throwable t = errorRef.get();
+        if (t != null) {
+            sched.stop();
+            try {
+                if (!onCompleteCalled) {
+                    if (debug.on())
+                        debug.log("calling subscriber.onError: %s", (Object)t);
+                    subscriber.onError(t);
+                } else {
+                    if (debug.on())
+                        debug.log("already completed: dropping error %s", (Object)t);
+                }
+            } catch (Throwable x) {
+                Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
+            } finally {
+                cancelImpl(t);
+            }
+        }
+    }
+
+    // Callback invoked after the Response BodySubscriber has consumed the
+    // buffers contained in a DataFrame.
+    // Returns true if END_STREAM is reached, false otherwise.
+    private boolean consumed(DataFrame df) {
+        // RFC 7540 6.1:
+        // The entire DATA frame payload is included in flow control,
+        // including the Pad Length and Padding fields if present
+        int len = df.payloadLength();
+        connection.windowUpdater.update(len);
+
+        if (!df.getFlag(DataFrame.END_STREAM)) {
+            // Don't send window update on a stream which is
+            // closed or half closed.
+            windowUpdater.update(len);
+            return false; // more data coming
+        }
+        return true; // end of stream
+    }
+
+    @Override
+    CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
+                                       boolean returnConnectionToPool,
+                                       Executor executor)
+    {
+        try {
+            Log.logTrace("Reading body on stream {0}", streamid);
+            BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
+            CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
+
+            PushGroup<?> pg = exchange.getPushGroup();
+            if (pg != null) {
+                // if an error occurs make sure it is recorded in the PushGroup
+                cf = cf.whenComplete((t, e) -> pg.pushError(e));
+            }
+            return cf;
+        } catch (Throwable t) {
+            // may be thrown by handler.apply
+            cancelImpl(t);
+            return MinimalFuture.failedFuture(t);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("streamid: ")
+                .append(streamid);
+        return sb.toString();
+    }
+
+    private void receiveDataFrame(DataFrame df) {
+        inputQ.add(df);
+        sched.runOrSchedule();
+    }
+
+    /** Handles a RESET frame. RESET is always handled inline in the queue. */
+    private void receiveResetFrame(ResetFrame frame) {
+        inputQ.add(frame);
+        sched.runOrSchedule();
+    }
+
+    // pushes entire response body into response subscriber
+    // blocking when required by local or remote flow control
+    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
+        responseBodyCF = new MinimalFuture<>();
+        // We want to allow the subscriber's getBody() method to block so it
+        // can work with InputStreams. So, we offload execution.
+        executor.execute(() -> {
+            try {
+                bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+                    if (t == null)
+                        responseBodyCF.complete(body);
+                    else
+                        responseBodyCF.completeExceptionally(t);
+                });
+            } catch(Throwable t) {
+                cancelImpl(t);
+            }
+        });
+
+        if (isCanceled()) {
+            Throwable t = getCancelCause();
+            responseBodyCF.completeExceptionally(t);
+        } else {
+            pendingResponseSubscriber = bodySubscriber;
+            sched.runOrSchedule(); // in case data waiting already to be processed
+        }
+        return responseBodyCF;
+    }
+
+    @Override
+    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+        return sendBodyImpl().thenApply( v -> this);
+    }
+
+    @SuppressWarnings("unchecked")
+    Stream(Http2Connection connection,
+           Exchange<T> e,
+           WindowController windowController)
+    {
+        super(e);
+        this.connection = connection;
+        this.windowController = windowController;
+        this.request = e.request();
+        this.requestPublisher = request.requestPublisher;  // may be null
+        responseHeaders = new HttpHeadersImpl();
+        rspHeadersConsumer = new HeadersConsumer();
+        this.requestPseudoHeaders = new HttpHeadersImpl();
+        // NEW
+        this.windowUpdater = new StreamWindowUpdateSender(connection);
+    }
+
+    /**
+     * Entry point from Http2Connection reader thread.
+     *
+     * Data frames will be removed by response body thread.
+     */
+    void incoming(Http2Frame frame) throws IOException {
+        if (debug.on()) debug.log("incoming: %s", frame);
+        if ((frame instanceof HeaderFrame)) {
+            HeaderFrame hframe = (HeaderFrame)frame;
+            if (hframe.endHeaders()) {
+                Log.logTrace("handling response (streamid={0})", streamid);
+                handleResponse();
+                if (hframe.getFlag(HeaderFrame.END_STREAM)) {
+                    receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
+                }
+            }
+        } else if (frame instanceof DataFrame) {
+            receiveDataFrame((DataFrame)frame);
+        } else {
+            otherFrame(frame);
+        }
+    }
+
+    void otherFrame(Http2Frame frame) throws IOException {
+        switch (frame.type()) {
+            case WindowUpdateFrame.TYPE:
+                incoming_windowUpdate((WindowUpdateFrame) frame);
+                break;
+            case ResetFrame.TYPE:
+                incoming_reset((ResetFrame) frame);
+                break;
+            case PriorityFrame.TYPE:
+                incoming_priority((PriorityFrame) frame);
+                break;
+            default:
+                String msg = "Unexpected frame: " + frame.toString();
+                throw new IOException(msg);
+        }
+    }
+
+    // The Hpack decoder decodes into one of these consumers of name,value pairs
+
+    DecodingCallback rspHeadersConsumer() {
+        return rspHeadersConsumer;
+    }
+
+    protected void handleResponse() throws IOException {
+        responseCode = (int)responseHeaders
+                .firstValueAsLong(":status")
+                .orElseThrow(() -> new IOException("no statuscode in response"));
+
+        response = new Response(
+                request, exchange, responseHeaders, connection(),
+                responseCode, HttpClient.Version.HTTP_2);
+
+        /* TODO: review if needs to be removed
+           the value is not used, but in case `content-length` doesn't parse as
+           long, there will be NumberFormatException. If left as is, make sure
+           code up the stack handles NFE correctly. */
+        responseHeaders.firstValueAsLong("content-length");
+
+        if (Log.headers()) {
+            StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
+            Log.dumpHeaders(sb, "    ", responseHeaders);
+            Log.logHeaders(sb.toString());
+        }
+
+        // this will clear the response headers
+        rspHeadersConsumer.reset();
+
+        completeResponse(response);
+    }
+
+    void incoming_reset(ResetFrame frame) {
+        Log.logTrace("Received RST_STREAM on stream {0}", streamid);
+        if (endStreamReceived()) {
+            Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
+        } else if (closed) {
+            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
+        } else {
+            // put it in the input queue in order to read all
+            // pending data frames first. Indeed, a server may send
+            // RST_STREAM after sending END_STREAM, in which case we should
+            // ignore it. However, we won't know if we have received END_STREAM
+            // or not until all pending data frames are read.
+            receiveResetFrame(frame);
+            // RST_STREAM was pushed to the queue. It will be handled by
+            // asyncReceive after all pending data frames have been
+            // processed.
+            Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+        }
+    }
+
+    void handleReset(ResetFrame frame) {
+        Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
+        if (!closed) {
+            close();
+            int error = frame.getErrorCode();
+            completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
+        } else {
+            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
+        }
+    }
+
+    void incoming_priority(PriorityFrame frame) {
+        // TODO: implement priority
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    private void incoming_windowUpdate(WindowUpdateFrame frame)
+        throws IOException
+    {
+        int amount = frame.getUpdate();
+        if (amount <= 0) {
+            Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
+                         streamid, streamid, amount);
+            connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
+        } else {
+            assert streamid != 0;
+            boolean success = windowController.increaseStreamWindow(amount, streamid);
+            if (!success) {  // overflow
+                connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
+            }
+        }
+    }
+
+    void incoming_pushPromise(HttpRequestImpl pushRequest,
+                              PushedStream<T> pushStream)
+        throws IOException
+    {
+        if (Log.requests()) {
+            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
+        }
+        PushGroup<T> pushGroup = exchange.getPushGroup();
+        if (pushGroup == null) {
+            Log.logTrace("Rejecting push promise stream " + streamid);
+            connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
+            pushStream.close();
+            return;
+        }
+
+        PushGroup.Acceptor<T> acceptor = null;
+        boolean accepted = false;
+        try {
+            acceptor = pushGroup.acceptPushRequest(pushRequest);
+            accepted = acceptor.accepted();
+        } catch (Throwable t) {
+            if (debug.on())
+                debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
+                          (Object)t);
+        }
+        if (!accepted) {
+            // cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
+            if (Log.trace()) {
+                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
+                        ex.getMessage());
+            }
+            pushStream.cancelImpl(ex);
+            return;
+        }
+
+        assert accepted && acceptor != null;
+        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
+        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+        assert pushHandler != null;
+
+        pushStream.requestSent();
+        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
+        // setup housekeeping for when the push is received
+        // TODO: deal with ignoring of CF anti-pattern
+        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
+        cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
+            t = Utils.getCompletionCause(t);
+            if (Log.trace()) {
+                Log.logTrace("Push completed on stream {0} for {1}{2}",
+                             pushStream.streamid, resp,
+                             ((t==null) ? "": " with exception " + t));
+            }
+            if (t != null) {
+                pushGroup.pushError(t);
+                pushResponseCF.completeExceptionally(t);
+            } else {
+                pushResponseCF.complete(resp);
+            }
+            pushGroup.pushCompleted();
+        });
+
+    }
+
+    private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
+        HttpHeadersImpl h = request.getSystemHeaders();
+        if (contentLength > 0) {
+            h.setHeader("content-length", Long.toString(contentLength));
+        }
+        setPseudoHeaderFields();
+        HttpHeaders sysh = filter(h);
+        HttpHeaders userh = filter(request.getUserHeaders());
+        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
+        if (contentLength == 0) {
+            f.setFlag(HeadersFrame.END_STREAM);
+            endStreamSent = true;
+        }
+        return f;
+    }
+
+    private boolean hasProxyAuthorization(HttpHeaders headers) {
+        return headers.firstValue("proxy-authorization")
+                      .isPresent();
+    }
+
+    // Determines whether we need to build a new HttpHeader object.
+    //
+    // Ideally we should pass the filter to OutgoingHeaders refactor the
+    // code that creates the HeaderFrame to honor the filter.
+    // We're not there yet - so depending on the filter we need to
+    // apply and the content of the header we will try to determine
+    //  whether anything might need to be filtered.
+    // If nothing needs filtering then we can just use the
+    // original headers.
+    private boolean needsFiltering(HttpHeaders headers,
+                                   BiPredicate<String, List<String>> filter) {
+        if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
+            // we're either connecting or proxying
+            // slight optimization: we only need to filter out
+            // disabled schemes, so if there are none just
+            // pass through.
+            return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
+                    && hasProxyAuthorization(headers);
+        } else {
+            // we're talking to a server, either directly or through
+            // a tunnel.
+            // Slight optimization: we only need to filter out
+            // proxy authorization headers, so if there are none just
+            // pass through.
+            return hasProxyAuthorization(headers);
+        }
+    }
+
+    private HttpHeaders filter(HttpHeaders headers) {
+        HttpConnection conn = connection();
+        BiPredicate<String, List<String>> filter =
+                conn.headerFilter(request);
+        if (needsFiltering(headers, filter)) {
+            return ImmutableHeaders.of(headers.map(), filter);
+        }
+        return headers;
+    }
+
+    private void setPseudoHeaderFields() {
+        HttpHeadersImpl hdrs = requestPseudoHeaders;
+        String method = request.method();
+        hdrs.setHeader(":method", method);
+        URI uri = request.uri();
+        hdrs.setHeader(":scheme", uri.getScheme());
+        // TODO: userinfo deprecated. Needs to be removed
+        hdrs.setHeader(":authority", uri.getAuthority());
+        // TODO: ensure header names beginning with : not in user headers
+        String query = uri.getRawQuery();
+        String path = uri.getRawPath();
+        if (path == null || path.isEmpty()) {
+            if (method.equalsIgnoreCase("OPTIONS")) {
+                path = "*";
+            } else {
+                path = "/";
+            }
+        }
+        if (query != null) {
+            path += "?" + query;
+        }
+        hdrs.setHeader(":path", path);
+    }
+
+    HttpHeadersImpl getRequestPseudoHeaders() {
+        return requestPseudoHeaders;
+    }
+
+    /** Sets endStreamReceived. Should be called only once. */
+    void setEndStreamReceived() {
+        assert remotelyClosed == false: "Unexpected endStream already set";
+        remotelyClosed = true;
+        responseReceived();
+    }
+
+    /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
+     *  received on this stream. */
+    private boolean endStreamReceived() {
+        return remotelyClosed;
+    }
+
+    @Override
+    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+        if (debug.on()) debug.log("sendHeadersOnly()");
+        if (Log.requests() && request != null) {
+            Log.logRequest(request.toString());
+        }
+        if (requestPublisher != null) {
+            requestContentLen = requestPublisher.contentLength();
+        } else {
+            requestContentLen = 0;
+        }
+        OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
+        connection.sendFrame(f);
+        CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
+        cf.complete(this);  // #### good enough for now
+        return cf;
+    }
+
+    @Override
+    void released() {
+        if (streamid > 0) {
+            if (debug.on()) debug.log("Released stream %d", streamid);
+            // remove this stream from the Http2Connection map.
+            connection.closeStream(streamid);
+        } else {
+            if (debug.on()) debug.log("Can't release stream %d", streamid);
+        }
+    }
+
+    @Override
+    void completed() {
+        // There should be nothing to do here: the stream should have
+        // been already closed (or will be closed shortly after).
+    }
+
+    void registerStream(int id) {
+        this.streamid = id;
+        connection.putStream(this, streamid);
+        if (debug.on()) debug.log("Registered stream %d", id);
+    }
+
+    void signalWindowUpdate() {
+        RequestSubscriber subscriber = requestSubscriber;
+        assert subscriber != null;
+        if (debug.on()) debug.log("Signalling window update");
+        subscriber.sendScheduler.runOrSchedule();
+    }
+
+    static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
+    class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
+        // can be < 0 if the actual length is not known.
+        private final long contentLength;
+        private volatile long remainingContentLength;
+        private volatile Subscription subscription;
+
+        // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
+        //  1) The data that was published by the request body Publisher, and
+        //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
+        final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
+
+        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        // A scheduler used to honor window updates. Writing must be paused
+        // when the window is exhausted, and resumed when the window acquires
+        // some space. The sendScheduler makes it possible to implement this
+        // behaviour in an asynchronous non-blocking way.
+        // See RequestSubscriber::trySend below.
+        final SequentialScheduler sendScheduler;
+
+        RequestSubscriber(long contentLen) {
+            this.contentLength = contentLen;
+            this.remainingContentLength = contentLen;
+            this.sendScheduler =
+                    SequentialScheduler.synchronizedScheduler(this::trySend);
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            if (this.subscription != null) {
+                throw new IllegalStateException("already subscribed");
+            }
+            this.subscription = subscription;
+            if (debug.on())
+                debug.log("RequestSubscriber: onSubscribe, request 1");
+            subscription.request(1);
+        }
+
+        @Override
+        public void onNext(ByteBuffer item) {
+            if (debug.on())
+                debug.log("RequestSubscriber: onNext(%d)", item.remaining());
+            int size = outgoing.size();
+            assert size == 0 : "non-zero size: " + size;
+            onNextImpl(item);
+        }
+
+        private void onNextImpl(ByteBuffer item) {
+            // Got some more request body bytes to send.
+            if (requestBodyCF.isDone()) {
+                // stream already cancelled, probably in timeout
+                sendScheduler.stop();
+                subscription.cancel();
+                return;
+            }
+            outgoing.add(item);
+            sendScheduler.runOrSchedule();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (debug.on())
+                debug.log(() -> "RequestSubscriber: onError: " + throwable);
+            // ensure that errors are handled within the flow.
+            if (errorRef.compareAndSet(null, throwable)) {
+                sendScheduler.runOrSchedule();
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            if (debug.on()) debug.log("RequestSubscriber: onComplete");
+            int size = outgoing.size();
+            assert size == 0 || size == 1 : "non-zero or one size: " + size;
+            // last byte of request body has been obtained.
+            // ensure that everything is completed within the flow.
+            onNextImpl(COMPLETED);
+        }
+
+        // Attempts to send the data, if any.
+        // Handles errors and completion state.
+        // Pause writing if the send window is exhausted, resume it if the
+        // send window has some bytes that can be acquired.
+        void trySend() {
+            try {
+                // handle errors raised by onError;
+                Throwable t = errorRef.get();
+                if (t != null) {
+                    sendScheduler.stop();
+                    if (requestBodyCF.isDone()) return;
+                    subscription.cancel();
+                    requestBodyCF.completeExceptionally(t);
+                    cancelImpl(t);
+                    return;
+                }
+
+                do {
+                    // handle COMPLETED;
+                    ByteBuffer item = outgoing.peekFirst();
+                    if (item == null) return;
+                    else if (item == COMPLETED) {
+                        sendScheduler.stop();
+                        complete();
+                        return;
+                    }
+
+                    // handle bytes to send downstream
+                    while (item.hasRemaining()) {
+                        if (debug.on()) debug.log("trySend: %d", item.remaining());
+                        assert !endStreamSent : "internal error, send data after END_STREAM flag";
+                        DataFrame df = getDataFrame(item);
+                        if (df == null) {
+                            if (debug.on())
+                                debug.log("trySend: can't send yet: %d", item.remaining());
+                            return; // the send window is exhausted: come back later
+                        }
+
+                        if (contentLength > 0) {
+                            remainingContentLength -= df.getDataLength();
+                            if (remainingContentLength < 0) {
+                                String msg = connection().getConnectionFlow()
+                                        + " stream=" + streamid + " "
+                                        + "[" + Thread.currentThread().getName() + "] "
+                                        + "Too many bytes in request body. Expected: "
+                                        + contentLength + ", got: "
+                                        + (contentLength - remainingContentLength);
+                                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+                                throw new IOException(msg);
+                            } else if (remainingContentLength == 0) {
+                                df.setFlag(DataFrame.END_STREAM);
+                                endStreamSent = true;
+                            }
+                        }
+                        if (debug.on())
+                            debug.log("trySend: sending: %d", df.getDataLength());
+                        connection.sendDataFrame(df);
+                    }
+                    assert !item.hasRemaining();
+                    ByteBuffer b = outgoing.removeFirst();
+                    assert b == item;
+                } while (outgoing.peekFirst() != null);
+
+                if (debug.on()) debug.log("trySend: request 1");
+                subscription.request(1);
+            } catch (Throwable ex) {
+                if (debug.on()) debug.log("trySend: ", ex);
+                sendScheduler.stop();
+                subscription.cancel();
+                requestBodyCF.completeExceptionally(ex);
+                // need to cancel the stream to 1. tell the server
+                // we don't want to receive any more data and
+                // 2. ensure that the operation ref count will be
+                // decremented on the HttpClient.
+                cancelImpl(ex);
+            }
+        }
+
+        private void complete() throws IOException {
+            long remaining = remainingContentLength;
+            long written = contentLength - remaining;
+            if (remaining > 0) {
+                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+                // let trySend() handle the exception
+                throw new IOException(connection().getConnectionFlow()
+                                     + " stream=" + streamid + " "
+                                     + "[" + Thread.currentThread().getName() +"] "
+                                     + "Too few bytes returned by the publisher ("
+                                              + written + "/"
+                                              + contentLength + ")");
+            }
+            if (!endStreamSent) {
+                endStreamSent = true;
+                connection.sendDataFrame(getEmptyEndStreamDataFrame());
+            }
+            requestBodyCF.complete(null);
+        }
+    }
+
+    /**
+     * Send a RESET frame to tell server to stop sending data on this stream
+     */
+    @Override
+    public CompletableFuture<Void> ignoreBody() {
+        try {
+            connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
+            return MinimalFuture.completedFuture(null);
+        } catch (Throwable e) {
+            Log.logTrace("Error resetting stream {0}", e.toString());
+            return MinimalFuture.failedFuture(e);
+        }
+    }
+
+    DataFrame getDataFrame(ByteBuffer buffer) {
+        int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
+        // blocks waiting for stream send window, if exhausted
+        int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
+        if (actualAmount <= 0) return null;
+        ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
+        DataFrame df = new DataFrame(streamid, 0 , outBuf);
+        return df;
+    }
+
+    private DataFrame getEmptyEndStreamDataFrame()  {
+        return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
+    }
+
+    /**
+     * A List of responses relating to this stream. Normally there is only
+     * one response, but intermediate responses like 100 are allowed
+     * and must be passed up to higher level before continuing. Deals with races
+     * such as if responses are returned before the CFs get created by
+     * getResponseAsync()
+     */
+
+    final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
+
+    @Override
+    CompletableFuture<Response> getResponseAsync(Executor executor) {
+        CompletableFuture<Response> cf;
+        // The code below deals with race condition that can be caused when
+        // completeResponse() is being called before getResponseAsync()
+        synchronized (response_cfs) {
+            if (!response_cfs.isEmpty()) {
+                // This CompletableFuture was created by completeResponse().
+                // it will be already completed.
+                cf = response_cfs.remove(0);
+                // if we find a cf here it should be already completed.
+                // finding a non completed cf should not happen. just assert it.
+                assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
+            } else {
+                // getResponseAsync() is called first. Create a CompletableFuture
+                // that will be completed by completeResponse() when
+                // completeResponse() is called.
+                cf = new MinimalFuture<>();
+                response_cfs.add(cf);
+            }
+        }
+        if (executor != null && !cf.isDone()) {
+            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
+            cf = cf.thenApplyAsync(r -> r, executor);
+        }
+        Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
+        PushGroup<?> pg = exchange.getPushGroup();
+        if (pg != null) {
+            // if an error occurs make sure it is recorded in the PushGroup
+            cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
+        }
+        return cf;
+    }
+
+    /**
+     * Completes the first uncompleted CF on list, and removes it. If there is no
+     * uncompleted CF then creates one (completes it) and adds to list
+     */
+    void completeResponse(Response resp) {
+        synchronized (response_cfs) {
+            CompletableFuture<Response> cf;
+            int cfs_len = response_cfs.size();
+            for (int i=0; i<cfs_len; i++) {
+                cf = response_cfs.get(i);
+                if (!cf.isDone()) {
+                    Log.logTrace("Completing response (streamid={0}): {1}",
+                                 streamid, cf);
+                    cf.complete(resp);
+                    response_cfs.remove(cf);
+                    return;
+                } // else we found the previous response: just leave it alone.
+            }
+            cf = MinimalFuture.completedFuture(resp);
+            Log.logTrace("Created completed future (streamid={0}): {1}",
+                         streamid, cf);
+            response_cfs.add(cf);
+        }
+    }
+
+    // methods to update state and remove stream when finished
+
+    synchronized void requestSent() {
+        requestSent = true;
+        if (responseReceived) {
+            close();
+        }
+    }
+
+    synchronized void responseReceived() {
+        responseReceived = true;
+        if (requestSent) {
+            close();
+        }
+    }
+
+    /**
+     * same as above but for errors
+     */
+    void completeResponseExceptionally(Throwable t) {
+        synchronized (response_cfs) {
+            // use index to avoid ConcurrentModificationException
+            // caused by removing the CF from within the loop.
+            for (int i = 0; i < response_cfs.size(); i++) {
+                CompletableFuture<Response> cf = response_cfs.get(i);
+                if (!cf.isDone()) {
+                    cf.completeExceptionally(t);
+                    response_cfs.remove(i);
+                    return;
+                }
+            }
+            response_cfs.add(MinimalFuture.failedFuture(t));
+        }
+    }
+
+    CompletableFuture<Void> sendBodyImpl() {
+        requestBodyCF.whenComplete((v, t) -> requestSent());
+        try {
+            if (requestPublisher != null) {
+                final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
+                requestPublisher.subscribe(requestSubscriber = subscriber);
+            } else {
+                // there is no request body, therefore the request is complete,
+                // END_STREAM has already sent with outgoing headers
+                requestBodyCF.complete(null);
+            }
+        } catch (Throwable t) {
+            cancelImpl(t);
+            requestBodyCF.completeExceptionally(t);
+        }
+        return requestBodyCF;
+    }
+
+    @Override
+    void cancel() {
+        cancel(new IOException("Stream " + streamid + " cancelled"));
+    }
+
+    void onSubscriptionError(Throwable t) {
+        errorRef.compareAndSet(null, t);
+        if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
+        // This is the special case where the subscriber
+        // has requested an illegal number of items.
+        // In this case, the error doesn't come from
+        // upstream, but from downstream, and we need to
+        // handle the error without waiting for the inputQ
+        // to be exhausted.
+        stopRequested = true;
+        sched.runOrSchedule();
+    }
+
+    @Override
+    void cancel(IOException cause) {
+        cancelImpl(cause);
+    }
+
+    // This method sends a RST_STREAM frame
+    void cancelImpl(Throwable e) {
+        errorRef.compareAndSet(null, e);
+        if (debug.on()) debug.log("cancelling stream {0}: {1}", streamid, e);
+        if (Log.trace()) {
+            Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
+        }
+        boolean closing;
+        if (closing = !closed) { // assigning closing to !closed
+            synchronized (this) {
+                if (closing = !closed) { // assigning closing to !closed
+                    closed=true;
+                }
+            }
+        }
+        if (closing) { // true if the stream has not been closed yet
+            if (responseSubscriber != null || pendingResponseSubscriber != null)
+                sched.runOrSchedule();
+        }
+        completeResponseExceptionally(e);
+        if (!requestBodyCF.isDone()) {
+            requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
+        }
+        if (responseBodyCF != null) {
+            responseBodyCF.completeExceptionally(errorRef.get());
+        }
+        try {
+            // will send a RST_STREAM frame
+            if (streamid != 0) {
+                connection.resetStream(streamid, ResetFrame.CANCEL);
+            }
+        } catch (IOException ex) {
+            Log.logError(ex);
+        }
+    }
+
+    // This method doesn't send any frame
+    void close() {
+        if (closed) return;
+        synchronized(this) {
+            if (closed) return;
+            closed = true;
+        }
+        Log.logTrace("Closing stream {0}", streamid);
+        connection.closeStream(streamid);
+        Log.logTrace("Stream {0} closed", streamid);
+    }
+
+    static class PushedStream<T> extends Stream<T> {
+        final PushGroup<T> pushGroup;
+        // push streams need the response CF allocated up front as it is
+        // given directly to user via the multi handler callback function.
+        final CompletableFuture<Response> pushCF;
+        CompletableFuture<HttpResponse<T>> responseCF;
+        final HttpRequestImpl pushReq;
+        HttpResponse.BodyHandler<T> pushHandler;
+
+        PushedStream(PushGroup<T> pushGroup,
+                     Http2Connection connection,
+                     Exchange<T> pushReq) {
+            // ## no request body possible, null window controller
+            super(connection, pushReq, null);
+            this.pushGroup = pushGroup;
+            this.pushReq = pushReq.request();
+            this.pushCF = new MinimalFuture<>();
+            this.responseCF = new MinimalFuture<>();
+
+        }
+
+        CompletableFuture<HttpResponse<T>> responseCF() {
+            return responseCF;
+        }
+
+        synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
+            this.pushHandler = pushHandler;
+        }
+
+        synchronized HttpResponse.BodyHandler<T> getPushHandler() {
+            // ignored parameters to function can be used as BodyHandler
+            return this.pushHandler;
+        }
+
+        // Following methods call the super class but in case of
+        // error record it in the PushGroup. The error method is called
+        // with a null value when no error occurred (is a no-op)
+        @Override
+        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+            return super.sendBodyAsync()
+                        .whenComplete((ExchangeImpl<T> v, Throwable t)
+                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
+        }
+
+        @Override
+        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+            return super.sendHeadersAsync()
+                        .whenComplete((ExchangeImpl<T> ex, Throwable t)
+                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
+        }
+
+        @Override
+        CompletableFuture<Response> getResponseAsync(Executor executor) {
+            CompletableFuture<Response> cf = pushCF.whenComplete(
+                    (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
+            if(executor!=null && !cf.isDone()) {
+                cf  = cf.thenApplyAsync( r -> r, executor);
+            }
+            return cf;
+        }
+
+        @Override
+        CompletableFuture<T> readBodyAsync(
+                HttpResponse.BodyHandler<T> handler,
+                boolean returnConnectionToPool,
+                Executor executor)
+        {
+            return super.readBodyAsync(handler, returnConnectionToPool, executor)
+                        .whenComplete((v, t) -> pushGroup.pushError(t));
+        }
+
+        @Override
+        void completeResponse(Response r) {
+            Log.logResponse(r::toString);
+            pushCF.complete(r); // not strictly required for push API
+            // start reading the body using the obtained BodySubscriber
+            CompletableFuture<Void> start = new MinimalFuture<>();
+            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
+                .whenComplete((T body, Throwable t) -> {
+                    if (t != null) {
+                        responseCF.completeExceptionally(t);
+                    } else {
+                        HttpResponseImpl<T> resp =
+                                new HttpResponseImpl<>(r.request, r, null, body, getExchange());
+                        responseCF.complete(resp);
+                    }
+                });
+            start.completeAsync(() -> null, getExchange().executor());
+        }
+
+        @Override
+        void completeResponseExceptionally(Throwable t) {
+            pushCF.completeExceptionally(t);
+        }
+
+//        @Override
+//        synchronized void responseReceived() {
+//            super.responseReceived();
+//        }
+
+        // create and return the PushResponseImpl
+        @Override
+        protected void handleResponse() {
+            responseCode = (int)responseHeaders
+                .firstValueAsLong(":status")
+                .orElse(-1);
+
+            if (responseCode == -1) {
+                completeResponseExceptionally(new IOException("No status code"));
+            }
+
+            this.response = new Response(
+                pushReq, exchange, responseHeaders, connection(),
+                responseCode, HttpClient.Version.HTTP_2);
+
+            /* TODO: review if needs to be removed
+               the value is not used, but in case `content-length` doesn't parse
+               as long, there will be NumberFormatException. If left as is, make
+               sure code up the stack handles NFE correctly. */
+            responseHeaders.firstValueAsLong("content-length");
+
+            if (Log.headers()) {
+                StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
+                sb.append(" (streamid=").append(streamid).append("):\n");
+                Log.dumpHeaders(sb, "    ", responseHeaders);
+                Log.logHeaders(sb.toString());
+            }
+
+            rspHeadersConsumer.reset();
+
+            // different implementations for normal streams and pushed streams
+            completeResponse(response);
+        }
+    }
+
+    final class StreamWindowUpdateSender extends WindowUpdateSender {
+
+        StreamWindowUpdateSender(Http2Connection connection) {
+            super(connection);
+        }
+
+        @Override
+        int getStreamId() {
+            return streamid;
+        }
+    }
+
+    /**
+     * Returns true if this exchange was canceled.
+     * @return true if this exchange was canceled.
+     */
+    synchronized boolean isCanceled() {
+        return errorRef.get() != null;
+    }
+
+    /**
+     * Returns the cause for which this exchange was canceled, if available.
+     * @return the cause for which this exchange was canceled, if available.
+     */
+    synchronized Throwable getCancelCause() {
+        return errorRef.get();
+    }
+
+    final String dbgString() {
+        return connection.dbgString() + "/Stream("+streamid+")";
+    }
+
+    private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
+
+        void reset() {
+            super.reset();
+            responseHeaders.clear();
+        }
+
+        @Override
+        public void onDecoded(CharSequence name, CharSequence value)
+                throws UncheckedIOException
+        {
+            String n = name.toString();
+            String v = value.toString();
+            super.onDecoded(n, v);
+            responseHeaders.addHeader(n, v);
+            if (Log.headers() && Log.trace()) {
+                Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
+                             streamid, n, v);
+            }
+        }
+    }
+}