src/java.net.http/share/classes/java/net/http/internal/Stream.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56082 1da51fab3032
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.net.URI;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.util.ArrayList;
       
    33 import java.util.Collections;
       
    34 import java.util.List;
       
    35 import java.util.concurrent.CompletableFuture;
       
    36 import java.util.concurrent.ConcurrentLinkedDeque;
       
    37 import java.util.concurrent.ConcurrentLinkedQueue;
       
    38 import java.util.concurrent.Executor;
       
    39 import java.util.concurrent.Flow;
       
    40 import java.util.concurrent.Flow.Subscription;
       
    41 import java.util.concurrent.atomic.AtomicReference;
       
    42 import java.util.function.BiPredicate;
       
    43 import java.net.http.HttpClient;
       
    44 import java.net.http.HttpHeaders;
       
    45 import java.net.http.HttpRequest;
       
    46 import java.net.http.HttpResponse;
       
    47 import java.net.http.HttpResponse.BodySubscriber;
       
    48 import java.net.http.internal.common.*;
       
    49 import java.net.http.internal.frame.*;
       
    50 import java.net.http.internal.hpack.DecodingCallback;
       
    51 
       
    52 /**
       
    53  * Http/2 Stream handling.
       
    54  *
       
    55  * REQUESTS
       
    56  *
       
    57  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
       
    58  *
       
    59  * sendRequest() -- sendHeadersOnly() + sendBody()
       
    60  *
       
    61  * sendBodyAsync() -- calls sendBody() in an executor thread.
       
    62  *
       
    63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
       
    64  *
       
    65  * sendRequestAsync() -- calls sendRequest() in an executor thread
       
    66  *
       
    67  * RESPONSES
       
    68  *
       
    69  * Multiple responses can be received per request. Responses are queued up on
       
    70  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
       
    71  * with the next response
       
    72  *
       
    73  * getResponseAsync() -- queries list of response CFs and returns first one
       
    74  *               if one exists. Otherwise, creates one and adds it to list
       
    75  *               and returns it. Completion is achieved through the
       
    76  *               incoming() upcall from connection reader thread.
       
    77  *
       
    78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
       
    79  *
       
    80  * responseBodyAsync() -- calls responseBody() in an executor thread.
       
    81  *
       
    82  * incoming() -- entry point called from connection reader thread. Frames are
       
    83  *               either handled immediately without blocking or for data frames
       
    84  *               placed on the stream's inputQ which is consumed by the stream's
       
    85  *               reader thread.
       
    86  *
       
    87  * PushedStream sub class
       
    88  * ======================
       
    89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
       
    90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
       
    91  * is created. PushedStream does not use responseCF list as there can be only
       
    92  * one response. The CF is created when the object created and when the response
       
    93  * HEADERS frame is received the object is completed.
       
    94  */
       
    95 class Stream<T> extends ExchangeImpl<T> {
       
    96 
       
    97     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
       
    98     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    99 
       
   100     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
       
   101     final SequentialScheduler sched =
       
   102             SequentialScheduler.synchronizedScheduler(this::schedule);
       
   103     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
       
   104 
       
   105     /**
       
   106      * This stream's identifier. Assigned lazily by the HTTP2Connection before
       
   107      * the stream's first frame is sent.
       
   108      */
       
   109     protected volatile int streamid;
       
   110 
       
   111     long requestContentLen;
       
   112 
       
   113     final Http2Connection connection;
       
   114     final HttpRequestImpl request;
       
   115     final DecodingCallback rspHeadersConsumer;
       
   116     HttpHeadersImpl responseHeaders;
       
   117     final HttpHeadersImpl requestPseudoHeaders;
       
   118     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
       
   119     final HttpRequest.BodyPublisher requestPublisher;
       
   120     volatile RequestSubscriber requestSubscriber;
       
   121     volatile int responseCode;
       
   122     volatile Response response;
       
   123     volatile Throwable failed; // The exception with which this stream was canceled.
       
   124     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
       
   125     volatile CompletableFuture<T> responseBodyCF;
       
   126 
       
   127     /** True if END_STREAM has been seen in a frame received on this stream. */
       
   128     private volatile boolean remotelyClosed;
       
   129     private volatile boolean closed;
       
   130     private volatile boolean endStreamSent;
       
   131 
       
   132     // state flags
       
   133     private boolean requestSent, responseReceived;
       
   134 
       
   135     /**
       
   136      * A reference to this Stream's connection Send Window controller. The
       
   137      * stream MUST acquire the appropriate amount of Send Window before
       
   138      * sending any data. Will be null for PushStreams, as they cannot send data.
       
   139      */
       
   140     private final WindowController windowController;
       
   141     private final WindowUpdateSender windowUpdater;
       
   142 
       
   143     @Override
       
   144     HttpConnection connection() {
       
   145         return connection.connection;
       
   146     }
       
   147 
       
   148     /**
       
   149      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
       
   150      * of after user subscription window has re-opened, from SubscriptionBase.request()
       
   151      */
       
   152     private void schedule() {
       
   153         if (responseSubscriber == null)
       
   154             // can't process anything yet
       
   155             return;
       
   156 
       
   157         try {
       
   158             while (!inputQ.isEmpty()) {
       
   159                 Http2Frame frame = inputQ.peek();
       
   160                 if (frame instanceof ResetFrame) {
       
   161                     inputQ.remove();
       
   162                     handleReset((ResetFrame)frame);
       
   163                     return;
       
   164                 }
       
   165                 DataFrame df = (DataFrame)frame;
       
   166                 boolean finished = df.getFlag(DataFrame.END_STREAM);
       
   167 
       
   168                 List<ByteBuffer> buffers = df.getData();
       
   169                 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
       
   170                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
       
   171                 if (size == 0 && finished) {
       
   172                     inputQ.remove();
       
   173                     Log.logTrace("responseSubscriber.onComplete");
       
   174                     debug.log(Level.DEBUG, "incoming: onComplete");
       
   175                     sched.stop();
       
   176                     responseSubscriber.onComplete();
       
   177                     setEndStreamReceived();
       
   178                     return;
       
   179                 } else if (userSubscription.tryDecrement()) {
       
   180                     inputQ.remove();
       
   181                     Log.logTrace("responseSubscriber.onNext {0}", size);
       
   182                     debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
       
   183                     responseSubscriber.onNext(dsts);
       
   184                     if (consumed(df)) {
       
   185                         Log.logTrace("responseSubscriber.onComplete");
       
   186                         debug.log(Level.DEBUG, "incoming: onComplete");
       
   187                         sched.stop();
       
   188                         responseSubscriber.onComplete();
       
   189                         setEndStreamReceived();
       
   190                         return;
       
   191                     }
       
   192                 } else {
       
   193                     return;
       
   194                 }
       
   195             }
       
   196         } catch (Throwable throwable) {
       
   197             failed = throwable;
       
   198         }
       
   199 
       
   200         Throwable t = failed;
       
   201         if (t != null) {
       
   202             sched.stop();
       
   203             responseSubscriber.onError(t);
       
   204             close();
       
   205         }
       
   206     }
       
   207 
       
   208     // Callback invoked after the Response BodySubscriber has consumed the
       
   209     // buffers contained in a DataFrame.
       
   210     // Returns true if END_STREAM is reached, false otherwise.
       
   211     private boolean consumed(DataFrame df) {
       
   212         // RFC 7540 6.1:
       
   213         // The entire DATA frame payload is included in flow control,
       
   214         // including the Pad Length and Padding fields if present
       
   215         int len = df.payloadLength();
       
   216         connection.windowUpdater.update(len);
       
   217 
       
   218         if (!df.getFlag(DataFrame.END_STREAM)) {
       
   219             // Don't send window update on a stream which is
       
   220             // closed or half closed.
       
   221             windowUpdater.update(len);
       
   222             return false; // more data coming
       
   223         }
       
   224         return true; // end of stream
       
   225     }
       
   226 
       
   227     @Override
       
   228     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
       
   229                                        boolean returnConnectionToPool,
       
   230                                        Executor executor)
       
   231     {
       
   232         Log.logTrace("Reading body on stream {0}", streamid);
       
   233         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
       
   234         CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
       
   235 
       
   236         PushGroup<?> pg = exchange.getPushGroup();
       
   237         if (pg != null) {
       
   238             // if an error occurs make sure it is recorded in the PushGroup
       
   239             cf = cf.whenComplete((t,e) -> pg.pushError(e));
       
   240         }
       
   241         return cf;
       
   242     }
       
   243 
       
   244     @Override
       
   245     public String toString() {
       
   246         StringBuilder sb = new StringBuilder();
       
   247         sb.append("streamid: ")
       
   248                 .append(streamid);
       
   249         return sb.toString();
       
   250     }
       
   251 
       
   252     private void receiveDataFrame(DataFrame df) {
       
   253         inputQ.add(df);
       
   254         sched.runOrSchedule();
       
   255     }
       
   256 
       
   257     /** Handles a RESET frame. RESET is always handled inline in the queue. */
       
   258     private void receiveResetFrame(ResetFrame frame) {
       
   259         inputQ.add(frame);
       
   260         sched.runOrSchedule();
       
   261     }
       
   262 
       
   263     // pushes entire response body into response subscriber
       
   264     // blocking when required by local or remote flow control
       
   265     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
       
   266         responseBodyCF = new MinimalFuture<>();
       
   267         // We want to allow the subscriber's getBody() method to block so it
       
   268         // can work with InputStreams. So, we offload execution.
       
   269         executor.execute(() -> {
       
   270             bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
       
   271                 if (t == null)
       
   272                     responseBodyCF.complete(body);
       
   273                 else
       
   274                     responseBodyCF.completeExceptionally(t);
       
   275             });
       
   276         });
       
   277 
       
   278         if (isCanceled()) {
       
   279             Throwable t = getCancelCause();
       
   280             responseBodyCF.completeExceptionally(t);
       
   281         } else {
       
   282             bodySubscriber.onSubscribe(userSubscription);
       
   283         }
       
   284         // Set the responseSubscriber field now that onSubscribe has been called.
       
   285         // This effectively allows the scheduler to start invoking the callbacks.
       
   286         responseSubscriber = bodySubscriber;
       
   287         sched.runOrSchedule(); // in case data waiting already to be processed
       
   288         return responseBodyCF;
       
   289     }
       
   290 
       
   291     @Override
       
   292     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
   293         return sendBodyImpl().thenApply( v -> this);
       
   294     }
       
   295 
       
   296     @SuppressWarnings("unchecked")
       
   297     Stream(Http2Connection connection,
       
   298            Exchange<T> e,
       
   299            WindowController windowController)
       
   300     {
       
   301         super(e);
       
   302         this.connection = connection;
       
   303         this.windowController = windowController;
       
   304         this.request = e.request();
       
   305         this.requestPublisher = request.requestPublisher;  // may be null
       
   306         responseHeaders = new HttpHeadersImpl();
       
   307         rspHeadersConsumer = (name, value) -> {
       
   308             responseHeaders.addHeader(name.toString(), value.toString());
       
   309             if (Log.headers() && Log.trace()) {
       
   310                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
       
   311                              streamid, name, value);
       
   312             }
       
   313         };
       
   314         this.requestPseudoHeaders = new HttpHeadersImpl();
       
   315         // NEW
       
   316         this.windowUpdater = new StreamWindowUpdateSender(connection);
       
   317     }
       
   318 
       
   319     /**
       
   320      * Entry point from Http2Connection reader thread.
       
   321      *
       
   322      * Data frames will be removed by response body thread.
       
   323      */
       
   324     void incoming(Http2Frame frame) throws IOException {
       
   325         debug.log(Level.DEBUG, "incoming: %s", frame);
       
   326         if ((frame instanceof HeaderFrame)) {
       
   327             HeaderFrame hframe = (HeaderFrame)frame;
       
   328             if (hframe.endHeaders()) {
       
   329                 Log.logTrace("handling response (streamid={0})", streamid);
       
   330                 handleResponse();
       
   331                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
       
   332                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
       
   333                 }
       
   334             }
       
   335         } else if (frame instanceof DataFrame) {
       
   336             receiveDataFrame((DataFrame)frame);
       
   337         } else {
       
   338             otherFrame(frame);
       
   339         }
       
   340     }
       
   341 
       
   342     void otherFrame(Http2Frame frame) throws IOException {
       
   343         switch (frame.type()) {
       
   344             case WindowUpdateFrame.TYPE:
       
   345                 incoming_windowUpdate((WindowUpdateFrame) frame);
       
   346                 break;
       
   347             case ResetFrame.TYPE:
       
   348                 incoming_reset((ResetFrame) frame);
       
   349                 break;
       
   350             case PriorityFrame.TYPE:
       
   351                 incoming_priority((PriorityFrame) frame);
       
   352                 break;
       
   353             default:
       
   354                 String msg = "Unexpected frame: " + frame.toString();
       
   355                 throw new IOException(msg);
       
   356         }
       
   357     }
       
   358 
       
   359     // The Hpack decoder decodes into one of these consumers of name,value pairs
       
   360 
       
   361     DecodingCallback rspHeadersConsumer() {
       
   362         return rspHeadersConsumer;
       
   363     }
       
   364 
       
   365     protected void handleResponse() throws IOException {
       
   366         responseCode = (int)responseHeaders
       
   367                 .firstValueAsLong(":status")
       
   368                 .orElseThrow(() -> new IOException("no statuscode in response"));
       
   369 
       
   370         response = new Response(
       
   371                 request, exchange, responseHeaders,
       
   372                 responseCode, HttpClient.Version.HTTP_2);
       
   373 
       
   374         /* TODO: review if needs to be removed
       
   375            the value is not used, but in case `content-length` doesn't parse as
       
   376            long, there will be NumberFormatException. If left as is, make sure
       
   377            code up the stack handles NFE correctly. */
       
   378         responseHeaders.firstValueAsLong("content-length");
       
   379 
       
   380         if (Log.headers()) {
       
   381             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
       
   382             Log.dumpHeaders(sb, "    ", responseHeaders);
       
   383             Log.logHeaders(sb.toString());
       
   384         }
       
   385 
       
   386         completeResponse(response);
       
   387     }
       
   388 
       
   389     void incoming_reset(ResetFrame frame) {
       
   390         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
       
   391         if (endStreamReceived()) {
       
   392             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
       
   393         } else if (closed) {
       
   394             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
       
   395         } else {
       
   396             // put it in the input queue in order to read all
       
   397             // pending data frames first. Indeed, a server may send
       
   398             // RST_STREAM after sending END_STREAM, in which case we should
       
   399             // ignore it. However, we won't know if we have received END_STREAM
       
   400             // or not until all pending data frames are read.
       
   401             receiveResetFrame(frame);
       
   402             // RST_STREAM was pushed to the queue. It will be handled by
       
   403             // asyncReceive after all pending data frames have been
       
   404             // processed.
       
   405             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
       
   406         }
       
   407     }
       
   408 
       
   409     void handleReset(ResetFrame frame) {
       
   410         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
       
   411         if (!closed) {
       
   412             close();
       
   413             int error = frame.getErrorCode();
       
   414             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
       
   415         } else {
       
   416             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
       
   417         }
       
   418     }
       
   419 
       
   420     void incoming_priority(PriorityFrame frame) {
       
   421         // TODO: implement priority
       
   422         throw new UnsupportedOperationException("Not implemented");
       
   423     }
       
   424 
       
   425     private void incoming_windowUpdate(WindowUpdateFrame frame)
       
   426         throws IOException
       
   427     {
       
   428         int amount = frame.getUpdate();
       
   429         if (amount <= 0) {
       
   430             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
       
   431                          streamid, streamid, amount);
       
   432             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
       
   433         } else {
       
   434             assert streamid != 0;
       
   435             boolean success = windowController.increaseStreamWindow(amount, streamid);
       
   436             if (!success) {  // overflow
       
   437                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
       
   438             }
       
   439         }
       
   440     }
       
   441 
       
   442     void incoming_pushPromise(HttpRequestImpl pushRequest,
       
   443                               PushedStream<T> pushStream)
       
   444         throws IOException
       
   445     {
       
   446         if (Log.requests()) {
       
   447             Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
       
   448         }
       
   449         PushGroup<T> pushGroup = exchange.getPushGroup();
       
   450         if (pushGroup == null) {
       
   451             Log.logTrace("Rejecting push promise stream " + streamid);
       
   452             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
       
   453             pushStream.close();
       
   454             return;
       
   455         }
       
   456 
       
   457         PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
       
   458 
       
   459         if (!acceptor.accepted()) {
       
   460             // cancel / reject
       
   461             IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
       
   462             if (Log.trace()) {
       
   463                 Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
       
   464                         ex.getMessage());
       
   465             }
       
   466             pushStream.cancelImpl(ex);
       
   467             return;
       
   468         }
       
   469 
       
   470         CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
       
   471         HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
       
   472         assert pushHandler != null;
       
   473 
       
   474         pushStream.requestSent();
       
   475         pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
       
   476         // setup housekeeping for when the push is received
       
   477         // TODO: deal with ignoring of CF anti-pattern
       
   478         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
       
   479         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
       
   480             t = Utils.getCompletionCause(t);
       
   481             if (Log.trace()) {
       
   482                 Log.logTrace("Push completed on stream {0} for {1}{2}",
       
   483                              pushStream.streamid, resp,
       
   484                              ((t==null) ? "": " with exception " + t));
       
   485             }
       
   486             if (t != null) {
       
   487                 pushGroup.pushError(t);
       
   488                 pushResponseCF.completeExceptionally(t);
       
   489             } else {
       
   490                 pushResponseCF.complete(resp);
       
   491             }
       
   492             pushGroup.pushCompleted();
       
   493         });
       
   494 
       
   495     }
       
   496 
       
   497     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
       
   498         HttpHeadersImpl h = request.getSystemHeaders();
       
   499         if (contentLength > 0) {
       
   500             h.setHeader("content-length", Long.toString(contentLength));
       
   501         }
       
   502         setPseudoHeaderFields();
       
   503         HttpHeaders sysh = filter(h);
       
   504         HttpHeaders userh = filter(request.getUserHeaders());
       
   505         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
       
   506         if (contentLength == 0) {
       
   507             f.setFlag(HeadersFrame.END_STREAM);
       
   508             endStreamSent = true;
       
   509         }
       
   510         return f;
       
   511     }
       
   512 
       
   513     private boolean hasProxyAuthorization(HttpHeaders headers) {
       
   514         return headers.firstValue("proxy-authorization")
       
   515                       .isPresent();
       
   516     }
       
   517 
       
   518     // Determines whether we need to build a new HttpHeader object.
       
   519     //
       
   520     // Ideally we should pass the filter to OutgoingHeaders refactor the
       
   521     // code that creates the HeaderFrame to honor the filter.
       
   522     // We're not there yet - so depending on the filter we need to
       
   523     // apply and the content of the header we will try to determine
       
   524     //  whether anything might need to be filtered.
       
   525     // If nothing needs filtering then we can just use the
       
   526     // original headers.
       
   527     private boolean needsFiltering(HttpHeaders headers,
       
   528                                    BiPredicate<String, List<String>> filter) {
       
   529         if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
       
   530             // we're either connecting or proxying
       
   531             // slight optimization: we only need to filter out
       
   532             // disabled schemes, so if there are none just
       
   533             // pass through.
       
   534             return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
       
   535                     && hasProxyAuthorization(headers);
       
   536         } else {
       
   537             // we're talking to a server, either directly or through
       
   538             // a tunnel.
       
   539             // Slight optimization: we only need to filter out
       
   540             // proxy authorization headers, so if there are none just
       
   541             // pass through.
       
   542             return hasProxyAuthorization(headers);
       
   543         }
       
   544     }
       
   545 
       
   546     private HttpHeaders filter(HttpHeaders headers) {
       
   547         HttpConnection conn = connection();
       
   548         BiPredicate<String, List<String>> filter =
       
   549                 conn.headerFilter(request);
       
   550         if (needsFiltering(headers, filter)) {
       
   551             return ImmutableHeaders.of(headers.map(), filter);
       
   552         }
       
   553         return headers;
       
   554     }
       
   555 
       
   556     private void setPseudoHeaderFields() {
       
   557         HttpHeadersImpl hdrs = requestPseudoHeaders;
       
   558         String method = request.method();
       
   559         hdrs.setHeader(":method", method);
       
   560         URI uri = request.uri();
       
   561         hdrs.setHeader(":scheme", uri.getScheme());
       
   562         // TODO: userinfo deprecated. Needs to be removed
       
   563         hdrs.setHeader(":authority", uri.getAuthority());
       
   564         // TODO: ensure header names beginning with : not in user headers
       
   565         String query = uri.getQuery();
       
   566         String path = uri.getPath();
       
   567         if (path == null || path.isEmpty()) {
       
   568             if (method.equalsIgnoreCase("OPTIONS")) {
       
   569                 path = "*";
       
   570             } else {
       
   571                 path = "/";
       
   572             }
       
   573         }
       
   574         if (query != null) {
       
   575             path += "?" + query;
       
   576         }
       
   577         hdrs.setHeader(":path", path);
       
   578     }
       
   579 
       
   580     HttpHeadersImpl getRequestPseudoHeaders() {
       
   581         return requestPseudoHeaders;
       
   582     }
       
   583 
       
   584     /** Sets endStreamReceived. Should be called only once. */
       
   585     void setEndStreamReceived() {
       
   586         assert remotelyClosed == false: "Unexpected endStream already set";
       
   587         remotelyClosed = true;
       
   588         responseReceived();
       
   589     }
       
   590 
       
   591     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
       
   592      *  received on this stream. */
       
   593     private boolean endStreamReceived() {
       
   594         return remotelyClosed;
       
   595     }
       
   596 
       
   597     @Override
       
   598     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
   599         debug.log(Level.DEBUG, "sendHeadersOnly()");
       
   600         if (Log.requests() && request != null) {
       
   601             Log.logRequest(request.toString());
       
   602         }
       
   603         if (requestPublisher != null) {
       
   604             requestContentLen = requestPublisher.contentLength();
       
   605         } else {
       
   606             requestContentLen = 0;
       
   607         }
       
   608         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
       
   609         connection.sendFrame(f);
       
   610         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
       
   611         cf.complete(this);  // #### good enough for now
       
   612         return cf;
       
   613     }
       
   614 
       
   615     @Override
       
   616     void released() {
       
   617         if (streamid > 0) {
       
   618             debug.log(Level.DEBUG, "Released stream %d", streamid);
       
   619             // remove this stream from the Http2Connection map.
       
   620             connection.closeStream(streamid);
       
   621         } else {
       
   622             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
       
   623         }
       
   624     }
       
   625 
       
   626     @Override
       
   627     void completed() {
       
   628         // There should be nothing to do here: the stream should have
       
   629         // been already closed (or will be closed shortly after).
       
   630     }
       
   631 
       
   632     void registerStream(int id) {
       
   633         this.streamid = id;
       
   634         connection.putStream(this, streamid);
       
   635         debug.log(Level.DEBUG, "Registered stream %d", id);
       
   636     }
       
   637 
       
   638     void signalWindowUpdate() {
       
   639         RequestSubscriber subscriber = requestSubscriber;
       
   640         assert subscriber != null;
       
   641         debug.log(Level.DEBUG, "Signalling window update");
       
   642         subscriber.sendScheduler.runOrSchedule();
       
   643     }
       
   644 
       
   645     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
       
   646     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
       
   647         // can be < 0 if the actual length is not known.
       
   648         private final long contentLength;
       
   649         private volatile long remainingContentLength;
       
   650         private volatile Subscription subscription;
       
   651 
       
   652         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
       
   653         //  1) The data that was published by the request body Publisher, and
       
   654         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
       
   655         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
       
   656 
       
   657         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   658         // A scheduler used to honor window updates. Writing must be paused
       
   659         // when the window is exhausted, and resumed when the window acquires
       
   660         // some space. The sendScheduler makes it possible to implement this
       
   661         // behaviour in an asynchronous non-blocking way.
       
   662         // See RequestSubscriber::trySend below.
       
   663         final SequentialScheduler sendScheduler;
       
   664 
       
   665         RequestSubscriber(long contentLen) {
       
   666             this.contentLength = contentLen;
       
   667             this.remainingContentLength = contentLen;
       
   668             this.sendScheduler =
       
   669                     SequentialScheduler.synchronizedScheduler(this::trySend);
       
   670         }
       
   671 
       
   672         @Override
       
   673         public void onSubscribe(Flow.Subscription subscription) {
       
   674             if (this.subscription != null) {
       
   675                 throw new IllegalStateException("already subscribed");
       
   676             }
       
   677             this.subscription = subscription;
       
   678             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
       
   679             subscription.request(1);
       
   680         }
       
   681 
       
   682         @Override
       
   683         public void onNext(ByteBuffer item) {
       
   684             debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
       
   685             int size = outgoing.size();
       
   686             assert size == 0 : "non-zero size: " + size;
       
   687             onNextImpl(item);
       
   688         }
       
   689 
       
   690         private void onNextImpl(ByteBuffer item) {
       
   691             // Got some more request body bytes to send.
       
   692             if (requestBodyCF.isDone()) {
       
   693                 // stream already cancelled, probably in timeout
       
   694                 sendScheduler.stop();
       
   695                 subscription.cancel();
       
   696                 return;
       
   697             }
       
   698             outgoing.add(item);
       
   699             sendScheduler.runOrSchedule();
       
   700         }
       
   701 
       
   702         @Override
       
   703         public void onError(Throwable throwable) {
       
   704             debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
       
   705             // ensure that errors are handled within the flow.
       
   706             if (errorRef.compareAndSet(null, throwable)) {
       
   707                 sendScheduler.runOrSchedule();
       
   708             }
       
   709         }
       
   710 
       
   711         @Override
       
   712         public void onComplete() {
       
   713             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
       
   714             int size = outgoing.size();
       
   715             assert size == 0 || size == 1 : "non-zero or one size: " + size;
       
   716             // last byte of request body has been obtained.
       
   717             // ensure that everything is completed within the flow.
       
   718             onNextImpl(COMPLETED);
       
   719         }
       
   720 
       
   721         // Attempts to send the data, if any.
       
   722         // Handles errors and completion state.
       
   723         // Pause writing if the send window is exhausted, resume it if the
       
   724         // send window has some bytes that can be acquired.
       
   725         void trySend() {
       
   726             try {
       
   727                 // handle errors raised by onError;
       
   728                 Throwable t = errorRef.get();
       
   729                 if (t != null) {
       
   730                     sendScheduler.stop();
       
   731                     if (requestBodyCF.isDone()) return;
       
   732                     subscription.cancel();
       
   733                     requestBodyCF.completeExceptionally(t);
       
   734                     return;
       
   735                 }
       
   736 
       
   737                 do {
       
   738                     // handle COMPLETED;
       
   739                     ByteBuffer item = outgoing.peekFirst();
       
   740                     if (item == null) return;
       
   741                     else if (item == COMPLETED) {
       
   742                         sendScheduler.stop();
       
   743                         complete();
       
   744                         return;
       
   745                     }
       
   746 
       
   747                     // handle bytes to send downstream
       
   748                     while (item.hasRemaining()) {
       
   749                         debug.log(Level.DEBUG, "trySend: %d", item.remaining());
       
   750                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
       
   751                         DataFrame df = getDataFrame(item);
       
   752                         if (df == null) {
       
   753                             debug.log(Level.DEBUG, "trySend: can't send yet: %d",
       
   754                                     item.remaining());
       
   755                             return; // the send window is exhausted: come back later
       
   756                         }
       
   757 
       
   758                         if (contentLength > 0) {
       
   759                             remainingContentLength -= df.getDataLength();
       
   760                             if (remainingContentLength < 0) {
       
   761                                 String msg = connection().getConnectionFlow()
       
   762                                         + " stream=" + streamid + " "
       
   763                                         + "[" + Thread.currentThread().getName() + "] "
       
   764                                         + "Too many bytes in request body. Expected: "
       
   765                                         + contentLength + ", got: "
       
   766                                         + (contentLength - remainingContentLength);
       
   767                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   768                                 throw new IOException(msg);
       
   769                             } else if (remainingContentLength == 0) {
       
   770                                 df.setFlag(DataFrame.END_STREAM);
       
   771                                 endStreamSent = true;
       
   772                             }
       
   773                         }
       
   774                         debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
       
   775                         connection.sendDataFrame(df);
       
   776                     }
       
   777                     assert !item.hasRemaining();
       
   778                     ByteBuffer b = outgoing.removeFirst();
       
   779                     assert b == item;
       
   780                 } while (outgoing.peekFirst() != null);
       
   781 
       
   782                 debug.log(Level.DEBUG, "trySend: request 1");
       
   783                 subscription.request(1);
       
   784             } catch (Throwable ex) {
       
   785                 debug.log(Level.DEBUG, "trySend: ", ex);
       
   786                 sendScheduler.stop();
       
   787                 subscription.cancel();
       
   788                 requestBodyCF.completeExceptionally(ex);
       
   789             }
       
   790         }
       
   791 
       
   792         private void complete() throws IOException {
       
   793             long remaining = remainingContentLength;
       
   794             long written = contentLength - remaining;
       
   795             if (remaining > 0) {
       
   796                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   797                 // let trySend() handle the exception
       
   798                 throw new IOException(connection().getConnectionFlow()
       
   799                                      + " stream=" + streamid + " "
       
   800                                      + "[" + Thread.currentThread().getName() +"] "
       
   801                                      + "Too few bytes returned by the publisher ("
       
   802                                               + written + "/"
       
   803                                               + contentLength + ")");
       
   804             }
       
   805             if (!endStreamSent) {
       
   806                 endStreamSent = true;
       
   807                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
       
   808             }
       
   809             requestBodyCF.complete(null);
       
   810         }
       
   811     }
       
   812 
       
   813     /**
       
   814      * Send a RESET frame to tell server to stop sending data on this stream
       
   815      */
       
   816     @Override
       
   817     public CompletableFuture<Void> ignoreBody() {
       
   818         try {
       
   819             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
       
   820             return MinimalFuture.completedFuture(null);
       
   821         } catch (Throwable e) {
       
   822             Log.logTrace("Error resetting stream {0}", e.toString());
       
   823             return MinimalFuture.failedFuture(e);
       
   824         }
       
   825     }
       
   826 
       
   827     DataFrame getDataFrame(ByteBuffer buffer) {
       
   828         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
       
   829         // blocks waiting for stream send window, if exhausted
       
   830         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
       
   831         if (actualAmount <= 0) return null;
       
   832         ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
       
   833         DataFrame df = new DataFrame(streamid, 0 , outBuf);
       
   834         return df;
       
   835     }
       
   836 
       
   837     private DataFrame getEmptyEndStreamDataFrame()  {
       
   838         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
       
   839     }
       
   840 
       
   841     /**
       
   842      * A List of responses relating to this stream. Normally there is only
       
   843      * one response, but intermediate responses like 100 are allowed
       
   844      * and must be passed up to higher level before continuing. Deals with races
       
   845      * such as if responses are returned before the CFs get created by
       
   846      * getResponseAsync()
       
   847      */
       
   848 
       
   849     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
       
   850 
       
   851     @Override
       
   852     CompletableFuture<Response> getResponseAsync(Executor executor) {
       
   853         CompletableFuture<Response> cf;
       
   854         // The code below deals with race condition that can be caused when
       
   855         // completeResponse() is being called before getResponseAsync()
       
   856         synchronized (response_cfs) {
       
   857             if (!response_cfs.isEmpty()) {
       
   858                 // This CompletableFuture was created by completeResponse().
       
   859                 // it will be already completed.
       
   860                 cf = response_cfs.remove(0);
       
   861                 // if we find a cf here it should be already completed.
       
   862                 // finding a non completed cf should not happen. just assert it.
       
   863                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
       
   864             } else {
       
   865                 // getResponseAsync() is called first. Create a CompletableFuture
       
   866                 // that will be completed by completeResponse() when
       
   867                 // completeResponse() is called.
       
   868                 cf = new MinimalFuture<>();
       
   869                 response_cfs.add(cf);
       
   870             }
       
   871         }
       
   872         if (executor != null && !cf.isDone()) {
       
   873             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
       
   874             cf = cf.thenApplyAsync(r -> r, executor);
       
   875         }
       
   876         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
       
   877         PushGroup<?> pg = exchange.getPushGroup();
       
   878         if (pg != null) {
       
   879             // if an error occurs make sure it is recorded in the PushGroup
       
   880             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
       
   881         }
       
   882         return cf;
       
   883     }
       
   884 
       
   885     /**
       
   886      * Completes the first uncompleted CF on list, and removes it. If there is no
       
   887      * uncompleted CF then creates one (completes it) and adds to list
       
   888      */
       
   889     void completeResponse(Response resp) {
       
   890         synchronized (response_cfs) {
       
   891             CompletableFuture<Response> cf;
       
   892             int cfs_len = response_cfs.size();
       
   893             for (int i=0; i<cfs_len; i++) {
       
   894                 cf = response_cfs.get(i);
       
   895                 if (!cf.isDone()) {
       
   896                     Log.logTrace("Completing response (streamid={0}): {1}",
       
   897                                  streamid, cf);
       
   898                     cf.complete(resp);
       
   899                     response_cfs.remove(cf);
       
   900                     return;
       
   901                 } // else we found the previous response: just leave it alone.
       
   902             }
       
   903             cf = MinimalFuture.completedFuture(resp);
       
   904             Log.logTrace("Created completed future (streamid={0}): {1}",
       
   905                          streamid, cf);
       
   906             response_cfs.add(cf);
       
   907         }
       
   908     }
       
   909 
       
   910     // methods to update state and remove stream when finished
       
   911 
       
   912     synchronized void requestSent() {
       
   913         requestSent = true;
       
   914         if (responseReceived) {
       
   915             close();
       
   916         }
       
   917     }
       
   918 
       
   919     synchronized void responseReceived() {
       
   920         responseReceived = true;
       
   921         if (requestSent) {
       
   922             close();
       
   923         }
       
   924     }
       
   925 
       
   926     /**
       
   927      * same as above but for errors
       
   928      */
       
   929     void completeResponseExceptionally(Throwable t) {
       
   930         synchronized (response_cfs) {
       
   931             // use index to avoid ConcurrentModificationException
       
   932             // caused by removing the CF from within the loop.
       
   933             for (int i = 0; i < response_cfs.size(); i++) {
       
   934                 CompletableFuture<Response> cf = response_cfs.get(i);
       
   935                 if (!cf.isDone()) {
       
   936                     cf.completeExceptionally(t);
       
   937                     response_cfs.remove(i);
       
   938                     return;
       
   939                 }
       
   940             }
       
   941             response_cfs.add(MinimalFuture.failedFuture(t));
       
   942         }
       
   943     }
       
   944 
       
   945     CompletableFuture<Void> sendBodyImpl() {
       
   946         requestBodyCF.whenComplete((v, t) -> requestSent());
       
   947         if (requestPublisher != null) {
       
   948             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
       
   949             requestPublisher.subscribe(requestSubscriber = subscriber);
       
   950         } else {
       
   951             // there is no request body, therefore the request is complete,
       
   952             // END_STREAM has already sent with outgoing headers
       
   953             requestBodyCF.complete(null);
       
   954         }
       
   955         return requestBodyCF;
       
   956     }
       
   957 
       
   958     @Override
       
   959     void cancel() {
       
   960         cancel(new IOException("Stream " + streamid + " cancelled"));
       
   961     }
       
   962 
       
   963     @Override
       
   964     void cancel(IOException cause) {
       
   965         cancelImpl(cause);
       
   966     }
       
   967 
       
   968     // This method sends a RST_STREAM frame
       
   969     void cancelImpl(Throwable e) {
       
   970         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
       
   971         if (Log.trace()) {
       
   972             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
       
   973         }
       
   974         boolean closing;
       
   975         if (closing = !closed) { // assigning closing to !closed
       
   976             synchronized (this) {
       
   977                 failed = e;
       
   978                 if (closing = !closed) { // assigning closing to !closed
       
   979                     closed=true;
       
   980                 }
       
   981             }
       
   982         }
       
   983         if (closing) { // true if the stream has not been closed yet
       
   984             if (responseSubscriber != null)
       
   985                 sched.runOrSchedule();
       
   986         }
       
   987         completeResponseExceptionally(e);
       
   988         if (!requestBodyCF.isDone()) {
       
   989             requestBodyCF.completeExceptionally(e); // we may be sending the body..
       
   990         }
       
   991         if (responseBodyCF != null) {
       
   992             responseBodyCF.completeExceptionally(e);
       
   993         }
       
   994         try {
       
   995             // will send a RST_STREAM frame
       
   996             if (streamid != 0) {
       
   997                 connection.resetStream(streamid, ResetFrame.CANCEL);
       
   998             }
       
   999         } catch (IOException ex) {
       
  1000             Log.logError(ex);
       
  1001         }
       
  1002     }
       
  1003 
       
  1004     // This method doesn't send any frame
       
  1005     void close() {
       
  1006         if (closed) return;
       
  1007         synchronized(this) {
       
  1008             if (closed) return;
       
  1009             closed = true;
       
  1010         }
       
  1011         Log.logTrace("Closing stream {0}", streamid);
       
  1012         connection.closeStream(streamid);
       
  1013         Log.logTrace("Stream {0} closed", streamid);
       
  1014     }
       
  1015 
       
  1016     static class PushedStream<T> extends Stream<T> {
       
  1017         final PushGroup<T> pushGroup;
       
  1018         // push streams need the response CF allocated up front as it is
       
  1019         // given directly to user via the multi handler callback function.
       
  1020         final CompletableFuture<Response> pushCF;
       
  1021         CompletableFuture<HttpResponse<T>> responseCF;
       
  1022         final HttpRequestImpl pushReq;
       
  1023         HttpResponse.BodyHandler<T> pushHandler;
       
  1024 
       
  1025         PushedStream(PushGroup<T> pushGroup,
       
  1026                      Http2Connection connection,
       
  1027                      Exchange<T> pushReq) {
       
  1028             // ## no request body possible, null window controller
       
  1029             super(connection, pushReq, null);
       
  1030             this.pushGroup = pushGroup;
       
  1031             this.pushReq = pushReq.request();
       
  1032             this.pushCF = new MinimalFuture<>();
       
  1033             this.responseCF = new MinimalFuture<>();
       
  1034 
       
  1035         }
       
  1036 
       
  1037         CompletableFuture<HttpResponse<T>> responseCF() {
       
  1038             return responseCF;
       
  1039         }
       
  1040 
       
  1041         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
       
  1042             this.pushHandler = pushHandler;
       
  1043         }
       
  1044 
       
  1045         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
       
  1046             // ignored parameters to function can be used as BodyHandler
       
  1047             return this.pushHandler;
       
  1048         }
       
  1049 
       
  1050         // Following methods call the super class but in case of
       
  1051         // error record it in the PushGroup. The error method is called
       
  1052         // with a null value when no error occurred (is a no-op)
       
  1053         @Override
       
  1054         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
  1055             return super.sendBodyAsync()
       
  1056                         .whenComplete((ExchangeImpl<T> v, Throwable t)
       
  1057                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1058         }
       
  1059 
       
  1060         @Override
       
  1061         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
  1062             return super.sendHeadersAsync()
       
  1063                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
       
  1064                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1065         }
       
  1066 
       
  1067         @Override
       
  1068         CompletableFuture<Response> getResponseAsync(Executor executor) {
       
  1069             CompletableFuture<Response> cf = pushCF.whenComplete(
       
  1070                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1071             if(executor!=null && !cf.isDone()) {
       
  1072                 cf  = cf.thenApplyAsync( r -> r, executor);
       
  1073             }
       
  1074             return cf;
       
  1075         }
       
  1076 
       
  1077         @Override
       
  1078         CompletableFuture<T> readBodyAsync(
       
  1079                 HttpResponse.BodyHandler<T> handler,
       
  1080                 boolean returnConnectionToPool,
       
  1081                 Executor executor)
       
  1082         {
       
  1083             return super.readBodyAsync(handler, returnConnectionToPool, executor)
       
  1084                         .whenComplete((v, t) -> pushGroup.pushError(t));
       
  1085         }
       
  1086 
       
  1087         @Override
       
  1088         void completeResponse(Response r) {
       
  1089             Log.logResponse(r::toString);
       
  1090             pushCF.complete(r); // not strictly required for push API
       
  1091             // start reading the body using the obtained BodySubscriber
       
  1092             CompletableFuture<Void> start = new MinimalFuture<>();
       
  1093             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
       
  1094                 .whenComplete((T body, Throwable t) -> {
       
  1095                     if (t != null) {
       
  1096                         responseCF.completeExceptionally(t);
       
  1097                     } else {
       
  1098                         HttpResponseImpl<T> resp =
       
  1099                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
       
  1100                         responseCF.complete(resp);
       
  1101                     }
       
  1102                 });
       
  1103             start.completeAsync(() -> null, getExchange().executor());
       
  1104         }
       
  1105 
       
  1106         @Override
       
  1107         void completeResponseExceptionally(Throwable t) {
       
  1108             pushCF.completeExceptionally(t);
       
  1109         }
       
  1110 
       
  1111 //        @Override
       
  1112 //        synchronized void responseReceived() {
       
  1113 //            super.responseReceived();
       
  1114 //        }
       
  1115 
       
  1116         // create and return the PushResponseImpl
       
  1117         @Override
       
  1118         protected void handleResponse() {
       
  1119             responseCode = (int)responseHeaders
       
  1120                 .firstValueAsLong(":status")
       
  1121                 .orElse(-1);
       
  1122 
       
  1123             if (responseCode == -1) {
       
  1124                 completeResponseExceptionally(new IOException("No status code"));
       
  1125             }
       
  1126 
       
  1127             this.response = new Response(
       
  1128                 pushReq, exchange, responseHeaders,
       
  1129                 responseCode, HttpClient.Version.HTTP_2);
       
  1130 
       
  1131             /* TODO: review if needs to be removed
       
  1132                the value is not used, but in case `content-length` doesn't parse
       
  1133                as long, there will be NumberFormatException. If left as is, make
       
  1134                sure code up the stack handles NFE correctly. */
       
  1135             responseHeaders.firstValueAsLong("content-length");
       
  1136 
       
  1137             if (Log.headers()) {
       
  1138                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
       
  1139                 sb.append(" (streamid=").append(streamid).append("): ");
       
  1140                 Log.dumpHeaders(sb, "    ", responseHeaders);
       
  1141                 Log.logHeaders(sb.toString());
       
  1142             }
       
  1143 
       
  1144             // different implementations for normal streams and pushed streams
       
  1145             completeResponse(response);
       
  1146         }
       
  1147     }
       
  1148 
       
  1149     final class StreamWindowUpdateSender extends WindowUpdateSender {
       
  1150 
       
  1151         StreamWindowUpdateSender(Http2Connection connection) {
       
  1152             super(connection);
       
  1153         }
       
  1154 
       
  1155         @Override
       
  1156         int getStreamId() {
       
  1157             return streamid;
       
  1158         }
       
  1159     }
       
  1160 
       
  1161     /**
       
  1162      * Returns true if this exchange was canceled.
       
  1163      * @return true if this exchange was canceled.
       
  1164      */
       
  1165     synchronized boolean isCanceled() {
       
  1166         return failed != null;
       
  1167     }
       
  1168 
       
  1169     /**
       
  1170      * Returns the cause for which this exchange was canceled, if available.
       
  1171      * @return the cause for which this exchange was canceled, if available.
       
  1172      */
       
  1173     synchronized Throwable getCancelCause() {
       
  1174         return failed;
       
  1175     }
       
  1176 
       
  1177     final String dbgString() {
       
  1178         return connection.dbgString() + "/Stream("+streamid+")";
       
  1179     }
       
  1180 }