src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56071 3353cb42b1b4
child 56082 1da51fab3032
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.net.URI;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.util.ArrayList;
       
    33 import java.util.Collections;
       
    34 import java.util.List;
       
    35 import java.util.concurrent.CompletableFuture;
       
    36 import java.util.concurrent.ConcurrentLinkedDeque;
       
    37 import java.util.concurrent.ConcurrentLinkedQueue;
       
    38 import java.util.concurrent.Executor;
       
    39 import java.util.concurrent.Flow;
       
    40 import java.util.concurrent.Flow.Subscription;
       
    41 import java.util.concurrent.atomic.AtomicReference;
       
    42 import java.util.function.BiPredicate;
       
    43 import jdk.incubator.http.HttpClient;
       
    44 import jdk.incubator.http.HttpHeaders;
       
    45 import jdk.incubator.http.HttpRequest;
       
    46 import jdk.incubator.http.HttpResponse;
       
    47 import jdk.incubator.http.HttpResponse.BodySubscriber;
       
    48 import jdk.incubator.http.internal.common.*;
       
    49 import jdk.incubator.http.internal.frame.*;
       
    50 import jdk.incubator.http.internal.hpack.DecodingCallback;
       
    51 
       
    52 /**
       
    53  * Http/2 Stream handling.
       
    54  *
       
    55  * REQUESTS
       
    56  *
       
    57  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
       
    58  *
       
    59  * sendRequest() -- sendHeadersOnly() + sendBody()
       
    60  *
       
    61  * sendBodyAsync() -- calls sendBody() in an executor thread.
       
    62  *
       
    63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
       
    64  *
       
    65  * sendRequestAsync() -- calls sendRequest() in an executor thread
       
    66  *
       
    67  * RESPONSES
       
    68  *
       
    69  * Multiple responses can be received per request. Responses are queued up on
       
    70  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
       
    71  * with the next response
       
    72  *
       
    73  * getResponseAsync() -- queries list of response CFs and returns first one
       
    74  *               if one exists. Otherwise, creates one and adds it to list
       
    75  *               and returns it. Completion is achieved through the
       
    76  *               incoming() upcall from connection reader thread.
       
    77  *
       
    78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
       
    79  *
       
    80  * responseBodyAsync() -- calls responseBody() in an executor thread.
       
    81  *
       
    82  * incoming() -- entry point called from connection reader thread. Frames are
       
    83  *               either handled immediately without blocking or for data frames
       
    84  *               placed on the stream's inputQ which is consumed by the stream's
       
    85  *               reader thread.
       
    86  *
       
    87  * PushedStream sub class
       
    88  * ======================
       
    89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
       
    90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
       
    91  * is created. PushedStream does not use responseCF list as there can be only
       
    92  * one response. The CF is created when the object created and when the response
       
    93  * HEADERS frame is received the object is completed.
       
    94  */
       
    95 class Stream<T> extends ExchangeImpl<T> {
       
    96 
       
    97     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
       
    98     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    99 
       
   100     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
       
   101     final SequentialScheduler sched =
       
   102             SequentialScheduler.synchronizedScheduler(this::schedule);
       
   103     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
       
   104 
       
   105     /**
       
   106      * This stream's identifier. Assigned lazily by the HTTP2Connection before
       
   107      * the stream's first frame is sent.
       
   108      */
       
   109     protected volatile int streamid;
       
   110 
       
   111     long requestContentLen;
       
   112 
       
   113     final Http2Connection connection;
       
   114     final HttpRequestImpl request;
       
   115     final DecodingCallback rspHeadersConsumer;
       
   116     HttpHeadersImpl responseHeaders;
       
   117     final HttpHeadersImpl requestPseudoHeaders;
       
   118     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
       
   119     final HttpRequest.BodyPublisher requestPublisher;
       
   120     volatile RequestSubscriber requestSubscriber;
       
   121     volatile int responseCode;
       
   122     volatile Response response;
       
   123     volatile Throwable failed; // The exception with which this stream was canceled.
       
   124     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
       
   125     volatile CompletableFuture<T> responseBodyCF;
       
   126 
       
   127     /** True if END_STREAM has been seen in a frame received on this stream. */
       
   128     private volatile boolean remotelyClosed;
       
   129     private volatile boolean closed;
       
   130     private volatile boolean endStreamSent;
       
   131 
       
   132     // state flags
       
   133     private boolean requestSent, responseReceived;
       
   134 
       
   135     /**
       
   136      * A reference to this Stream's connection Send Window controller. The
       
   137      * stream MUST acquire the appropriate amount of Send Window before
       
   138      * sending any data. Will be null for PushStreams, as they cannot send data.
       
   139      */
       
   140     private final WindowController windowController;
       
   141     private final WindowUpdateSender windowUpdater;
       
   142 
       
   143     @Override
       
   144     HttpConnection connection() {
       
   145         return connection.connection;
       
   146     }
       
   147 
       
   148     /**
       
   149      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
       
   150      * of after user subscription window has re-opened, from SubscriptionBase.request()
       
   151      */
       
   152     private void schedule() {
       
   153         if (responseSubscriber == null)
       
   154             // can't process anything yet
       
   155             return;
       
   156 
       
   157         try {
       
   158             while (!inputQ.isEmpty()) {
       
   159                 Http2Frame frame = inputQ.peek();
       
   160                 if (frame instanceof ResetFrame) {
       
   161                     inputQ.remove();
       
   162                     handleReset((ResetFrame)frame);
       
   163                     return;
       
   164                 }
       
   165                 DataFrame df = (DataFrame)frame;
       
   166                 boolean finished = df.getFlag(DataFrame.END_STREAM);
       
   167 
       
   168                 List<ByteBuffer> buffers = df.getData();
       
   169                 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
       
   170                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
       
   171                 if (size == 0 && finished) {
       
   172                     inputQ.remove();
       
   173                     Log.logTrace("responseSubscriber.onComplete");
       
   174                     debug.log(Level.DEBUG, "incoming: onComplete");
       
   175                     sched.stop();
       
   176                     responseSubscriber.onComplete();
       
   177                     setEndStreamReceived();
       
   178                     return;
       
   179                 } else if (userSubscription.tryDecrement()) {
       
   180                     inputQ.remove();
       
   181                     Log.logTrace("responseSubscriber.onNext {0}", size);
       
   182                     debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
       
   183                     responseSubscriber.onNext(dsts);
       
   184                     if (consumed(df)) {
       
   185                         Log.logTrace("responseSubscriber.onComplete");
       
   186                         debug.log(Level.DEBUG, "incoming: onComplete");
       
   187                         sched.stop();
       
   188                         responseSubscriber.onComplete();
       
   189                         setEndStreamReceived();
       
   190                         return;
       
   191                     }
       
   192                 } else {
       
   193                     return;
       
   194                 }
       
   195             }
       
   196         } catch (Throwable throwable) {
       
   197             failed = throwable;
       
   198         }
       
   199 
       
   200         Throwable t = failed;
       
   201         if (t != null) {
       
   202             sched.stop();
       
   203             responseSubscriber.onError(t);
       
   204             close();
       
   205         }
       
   206     }
       
   207 
       
   208     // Callback invoked after the Response BodySubscriber has consumed the
       
   209     // buffers contained in a DataFrame.
       
   210     // Returns true if END_STREAM is reached, false otherwise.
       
   211     private boolean consumed(DataFrame df) {
       
   212         // RFC 7540 6.1:
       
   213         // The entire DATA frame payload is included in flow control,
       
   214         // including the Pad Length and Padding fields if present
       
   215         int len = df.payloadLength();
       
   216         connection.windowUpdater.update(len);
       
   217 
       
   218         if (!df.getFlag(DataFrame.END_STREAM)) {
       
   219             // Don't send window update on a stream which is
       
   220             // closed or half closed.
       
   221             windowUpdater.update(len);
       
   222             return false; // more data coming
       
   223         }
       
   224         return true; // end of stream
       
   225     }
       
   226 
       
   227     @Override
       
   228     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
       
   229                                        boolean returnConnectionToPool,
       
   230                                        Executor executor)
       
   231     {
       
   232         Log.logTrace("Reading body on stream {0}", streamid);
       
   233         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
       
   234         CompletableFuture<T> cf = receiveData(bodySubscriber);
       
   235 
       
   236         PushGroup<?> pg = exchange.getPushGroup();
       
   237         if (pg != null) {
       
   238             // if an error occurs make sure it is recorded in the PushGroup
       
   239             cf = cf.whenComplete((t,e) -> pg.pushError(e));
       
   240         }
       
   241         return cf;
       
   242     }
       
   243 
       
   244     @Override
       
   245     public String toString() {
       
   246         StringBuilder sb = new StringBuilder();
       
   247         sb.append("streamid: ")
       
   248                 .append(streamid);
       
   249         return sb.toString();
       
   250     }
       
   251 
       
   252     private void receiveDataFrame(DataFrame df) {
       
   253         inputQ.add(df);
       
   254         sched.runOrSchedule();
       
   255     }
       
   256 
       
   257     /** Handles a RESET frame. RESET is always handled inline in the queue. */
       
   258     private void receiveResetFrame(ResetFrame frame) {
       
   259         inputQ.add(frame);
       
   260         sched.runOrSchedule();
       
   261     }
       
   262 
       
   263     // pushes entire response body into response subscriber
       
   264     // blocking when required by local or remote flow control
       
   265     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
       
   266         responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
       
   267 
       
   268         if (isCanceled()) {
       
   269             Throwable t = getCancelCause();
       
   270             responseBodyCF.completeExceptionally(t);
       
   271         } else {
       
   272             bodySubscriber.onSubscribe(userSubscription);
       
   273         }
       
   274         // Set the responseSubscriber field now that onSubscribe has been called.
       
   275         // This effectively allows the scheduler to start invoking the callbacks.
       
   276         responseSubscriber = bodySubscriber;
       
   277         sched.runOrSchedule(); // in case data waiting already to be processed
       
   278         return responseBodyCF;
       
   279     }
       
   280 
       
   281     @Override
       
   282     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
   283         return sendBodyImpl().thenApply( v -> this);
       
   284     }
       
   285 
       
   286     @SuppressWarnings("unchecked")
       
   287     Stream(Http2Connection connection,
       
   288            Exchange<T> e,
       
   289            WindowController windowController)
       
   290     {
       
   291         super(e);
       
   292         this.connection = connection;
       
   293         this.windowController = windowController;
       
   294         this.request = e.request();
       
   295         this.requestPublisher = request.requestPublisher;  // may be null
       
   296         responseHeaders = new HttpHeadersImpl();
       
   297         rspHeadersConsumer = (name, value) -> {
       
   298             responseHeaders.addHeader(name.toString(), value.toString());
       
   299             if (Log.headers() && Log.trace()) {
       
   300                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
       
   301                              streamid, name, value);
       
   302             }
       
   303         };
       
   304         this.requestPseudoHeaders = new HttpHeadersImpl();
       
   305         // NEW
       
   306         this.windowUpdater = new StreamWindowUpdateSender(connection);
       
   307     }
       
   308 
       
   309     /**
       
   310      * Entry point from Http2Connection reader thread.
       
   311      *
       
   312      * Data frames will be removed by response body thread.
       
   313      */
       
   314     void incoming(Http2Frame frame) throws IOException {
       
   315         debug.log(Level.DEBUG, "incoming: %s", frame);
       
   316         if ((frame instanceof HeaderFrame)) {
       
   317             HeaderFrame hframe = (HeaderFrame)frame;
       
   318             if (hframe.endHeaders()) {
       
   319                 Log.logTrace("handling response (streamid={0})", streamid);
       
   320                 handleResponse();
       
   321                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
       
   322                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
       
   323                 }
       
   324             }
       
   325         } else if (frame instanceof DataFrame) {
       
   326             receiveDataFrame((DataFrame)frame);
       
   327         } else {
       
   328             otherFrame(frame);
       
   329         }
       
   330     }
       
   331 
       
   332     void otherFrame(Http2Frame frame) throws IOException {
       
   333         switch (frame.type()) {
       
   334             case WindowUpdateFrame.TYPE:
       
   335                 incoming_windowUpdate((WindowUpdateFrame) frame);
       
   336                 break;
       
   337             case ResetFrame.TYPE:
       
   338                 incoming_reset((ResetFrame) frame);
       
   339                 break;
       
   340             case PriorityFrame.TYPE:
       
   341                 incoming_priority((PriorityFrame) frame);
       
   342                 break;
       
   343             default:
       
   344                 String msg = "Unexpected frame: " + frame.toString();
       
   345                 throw new IOException(msg);
       
   346         }
       
   347     }
       
   348 
       
   349     // The Hpack decoder decodes into one of these consumers of name,value pairs
       
   350 
       
   351     DecodingCallback rspHeadersConsumer() {
       
   352         return rspHeadersConsumer;
       
   353     }
       
   354 
       
   355     protected void handleResponse() throws IOException {
       
   356         responseCode = (int)responseHeaders
       
   357                 .firstValueAsLong(":status")
       
   358                 .orElseThrow(() -> new IOException("no statuscode in response"));
       
   359 
       
   360         response = new Response(
       
   361                 request, exchange, responseHeaders,
       
   362                 responseCode, HttpClient.Version.HTTP_2);
       
   363 
       
   364         /* TODO: review if needs to be removed
       
   365            the value is not used, but in case `content-length` doesn't parse as
       
   366            long, there will be NumberFormatException. If left as is, make sure
       
   367            code up the stack handles NFE correctly. */
       
   368         responseHeaders.firstValueAsLong("content-length");
       
   369 
       
   370         if (Log.headers()) {
       
   371             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
       
   372             Log.dumpHeaders(sb, "    ", responseHeaders);
       
   373             Log.logHeaders(sb.toString());
       
   374         }
       
   375 
       
   376         completeResponse(response);
       
   377     }
       
   378 
       
   379     void incoming_reset(ResetFrame frame) {
       
   380         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
       
   381         if (endStreamReceived()) {
       
   382             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
       
   383         } else if (closed) {
       
   384             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
       
   385         } else {
       
   386             // put it in the input queue in order to read all
       
   387             // pending data frames first. Indeed, a server may send
       
   388             // RST_STREAM after sending END_STREAM, in which case we should
       
   389             // ignore it. However, we won't know if we have received END_STREAM
       
   390             // or not until all pending data frames are read.
       
   391             receiveResetFrame(frame);
       
   392             // RST_STREAM was pushed to the queue. It will be handled by
       
   393             // asyncReceive after all pending data frames have been
       
   394             // processed.
       
   395             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
       
   396         }
       
   397     }
       
   398 
       
   399     void handleReset(ResetFrame frame) {
       
   400         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
       
   401         if (!closed) {
       
   402             close();
       
   403             int error = frame.getErrorCode();
       
   404             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
       
   405         } else {
       
   406             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
       
   407         }
       
   408     }
       
   409 
       
   410     void incoming_priority(PriorityFrame frame) {
       
   411         // TODO: implement priority
       
   412         throw new UnsupportedOperationException("Not implemented");
       
   413     }
       
   414 
       
   415     private void incoming_windowUpdate(WindowUpdateFrame frame)
       
   416         throws IOException
       
   417     {
       
   418         int amount = frame.getUpdate();
       
   419         if (amount <= 0) {
       
   420             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
       
   421                          streamid, streamid, amount);
       
   422             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
       
   423         } else {
       
   424             assert streamid != 0;
       
   425             boolean success = windowController.increaseStreamWindow(amount, streamid);
       
   426             if (!success) {  // overflow
       
   427                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
       
   428             }
       
   429         }
       
   430     }
       
   431 
       
   432     void incoming_pushPromise(HttpRequestImpl pushRequest,
       
   433                               PushedStream<T> pushStream)
       
   434         throws IOException
       
   435     {
       
   436         if (Log.requests()) {
       
   437             Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
       
   438         }
       
   439         PushGroup<T> pushGroup = exchange.getPushGroup();
       
   440         if (pushGroup == null) {
       
   441             Log.logTrace("Rejecting push promise stream " + streamid);
       
   442             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
       
   443             pushStream.close();
       
   444             return;
       
   445         }
       
   446 
       
   447         PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
       
   448 
       
   449         if (!acceptor.accepted()) {
       
   450             // cancel / reject
       
   451             IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
       
   452             if (Log.trace()) {
       
   453                 Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
       
   454                         ex.getMessage());
       
   455             }
       
   456             pushStream.cancelImpl(ex);
       
   457             return;
       
   458         }
       
   459 
       
   460         CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
       
   461         HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
       
   462         assert pushHandler != null;
       
   463 
       
   464         pushStream.requestSent();
       
   465         pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
       
   466         // setup housekeeping for when the push is received
       
   467         // TODO: deal with ignoring of CF anti-pattern
       
   468         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
       
   469         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
       
   470             t = Utils.getCompletionCause(t);
       
   471             if (Log.trace()) {
       
   472                 Log.logTrace("Push completed on stream {0} for {1}{2}",
       
   473                              pushStream.streamid, resp,
       
   474                              ((t==null) ? "": " with exception " + t));
       
   475             }
       
   476             if (t != null) {
       
   477                 pushGroup.pushError(t);
       
   478                 pushResponseCF.completeExceptionally(t);
       
   479             } else {
       
   480                 pushResponseCF.complete(resp);
       
   481             }
       
   482             pushGroup.pushCompleted();
       
   483         });
       
   484 
       
   485     }
       
   486 
       
   487     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
       
   488         HttpHeadersImpl h = request.getSystemHeaders();
       
   489         if (contentLength > 0) {
       
   490             h.setHeader("content-length", Long.toString(contentLength));
       
   491         }
       
   492         setPseudoHeaderFields();
       
   493         HttpHeaders sysh = filter(h);
       
   494         HttpHeaders userh = filter(request.getUserHeaders());
       
   495         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
       
   496         if (contentLength == 0) {
       
   497             f.setFlag(HeadersFrame.END_STREAM);
       
   498             endStreamSent = true;
       
   499         }
       
   500         return f;
       
   501     }
       
   502 
       
   503     private boolean hasProxyAuthorization(HttpHeaders headers) {
       
   504         return headers.firstValue("proxy-authorization")
       
   505                       .isPresent();
       
   506     }
       
   507 
       
   508     // Determines whether we need to build a new HttpHeader object.
       
   509     //
       
   510     // Ideally we should pass the filter to OutgoingHeaders refactor the
       
   511     // code that creates the HeaderFrame to honor the filter.
       
   512     // We're not there yet - so depending on the filter we need to
       
   513     // apply and the content of the header we will try to determine
       
   514     //  whether anything might need to be filtered.
       
   515     // If nothing needs filtering then we can just use the
       
   516     // original headers.
       
   517     private boolean needsFiltering(HttpHeaders headers,
       
   518                                    BiPredicate<String, List<String>> filter) {
       
   519         if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
       
   520             // we're either connecting or proxying
       
   521             // slight optimization: we only need to filter out
       
   522             // disabled schemes, so if there are none just
       
   523             // pass through.
       
   524             return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
       
   525                     && hasProxyAuthorization(headers);
       
   526         } else {
       
   527             // we're talking to a server, either directly or through
       
   528             // a tunnel.
       
   529             // Slight optimization: we only need to filter out
       
   530             // proxy authorization headers, so if there are none just
       
   531             // pass through.
       
   532             return hasProxyAuthorization(headers);
       
   533         }
       
   534     }
       
   535 
       
   536     private HttpHeaders filter(HttpHeaders headers) {
       
   537         HttpConnection conn = connection();
       
   538         BiPredicate<String, List<String>> filter =
       
   539                 conn.headerFilter(request);
       
   540         if (needsFiltering(headers, filter)) {
       
   541             return ImmutableHeaders.of(headers.map(), filter);
       
   542         }
       
   543         return headers;
       
   544     }
       
   545 
       
   546     private void setPseudoHeaderFields() {
       
   547         HttpHeadersImpl hdrs = requestPseudoHeaders;
       
   548         String method = request.method();
       
   549         hdrs.setHeader(":method", method);
       
   550         URI uri = request.uri();
       
   551         hdrs.setHeader(":scheme", uri.getScheme());
       
   552         // TODO: userinfo deprecated. Needs to be removed
       
   553         hdrs.setHeader(":authority", uri.getAuthority());
       
   554         // TODO: ensure header names beginning with : not in user headers
       
   555         String query = uri.getQuery();
       
   556         String path = uri.getPath();
       
   557         if (path == null || path.isEmpty()) {
       
   558             if (method.equalsIgnoreCase("OPTIONS")) {
       
   559                 path = "*";
       
   560             } else {
       
   561                 path = "/";
       
   562             }
       
   563         }
       
   564         if (query != null) {
       
   565             path += "?" + query;
       
   566         }
       
   567         hdrs.setHeader(":path", path);
       
   568     }
       
   569 
       
   570     HttpHeadersImpl getRequestPseudoHeaders() {
       
   571         return requestPseudoHeaders;
       
   572     }
       
   573 
       
   574     /** Sets endStreamReceived. Should be called only once. */
       
   575     void setEndStreamReceived() {
       
   576         assert remotelyClosed == false: "Unexpected endStream already set";
       
   577         remotelyClosed = true;
       
   578         responseReceived();
       
   579     }
       
   580 
       
   581     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
       
   582      *  received on this stream. */
       
   583     private boolean endStreamReceived() {
       
   584         return remotelyClosed;
       
   585     }
       
   586 
       
   587     @Override
       
   588     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
   589         debug.log(Level.DEBUG, "sendHeadersOnly()");
       
   590         if (Log.requests() && request != null) {
       
   591             Log.logRequest(request.toString());
       
   592         }
       
   593         if (requestPublisher != null) {
       
   594             requestContentLen = requestPublisher.contentLength();
       
   595         } else {
       
   596             requestContentLen = 0;
       
   597         }
       
   598         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
       
   599         connection.sendFrame(f);
       
   600         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
       
   601         cf.complete(this);  // #### good enough for now
       
   602         return cf;
       
   603     }
       
   604 
       
   605     @Override
       
   606     void released() {
       
   607         if (streamid > 0) {
       
   608             debug.log(Level.DEBUG, "Released stream %d", streamid);
       
   609             // remove this stream from the Http2Connection map.
       
   610             connection.closeStream(streamid);
       
   611         } else {
       
   612             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
       
   613         }
       
   614     }
       
   615 
       
   616     @Override
       
   617     void completed() {
       
   618         // There should be nothing to do here: the stream should have
       
   619         // been already closed (or will be closed shortly after).
       
   620     }
       
   621 
       
   622     void registerStream(int id) {
       
   623         this.streamid = id;
       
   624         connection.putStream(this, streamid);
       
   625         debug.log(Level.DEBUG, "Registered stream %d", id);
       
   626     }
       
   627 
       
   628     void signalWindowUpdate() {
       
   629         RequestSubscriber subscriber = requestSubscriber;
       
   630         assert subscriber != null;
       
   631         debug.log(Level.DEBUG, "Signalling window update");
       
   632         subscriber.sendScheduler.runOrSchedule();
       
   633     }
       
   634 
       
   635     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
       
   636     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
       
   637         // can be < 0 if the actual length is not known.
       
   638         private final long contentLength;
       
   639         private volatile long remainingContentLength;
       
   640         private volatile Subscription subscription;
       
   641 
       
   642         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
       
   643         //  1) The data that was published by the request body Publisher, and
       
   644         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
       
   645         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
       
   646 
       
   647         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   648         // A scheduler used to honor window updates. Writing must be paused
       
   649         // when the window is exhausted, and resumed when the window acquires
       
   650         // some space. The sendScheduler makes it possible to implement this
       
   651         // behaviour in an asynchronous non-blocking way.
       
   652         // See RequestSubscriber::trySend below.
       
   653         final SequentialScheduler sendScheduler;
       
   654 
       
   655         RequestSubscriber(long contentLen) {
       
   656             this.contentLength = contentLen;
       
   657             this.remainingContentLength = contentLen;
       
   658             this.sendScheduler =
       
   659                     SequentialScheduler.synchronizedScheduler(this::trySend);
       
   660         }
       
   661 
       
   662         @Override
       
   663         public void onSubscribe(Flow.Subscription subscription) {
       
   664             if (this.subscription != null) {
       
   665                 throw new IllegalStateException("already subscribed");
       
   666             }
       
   667             this.subscription = subscription;
       
   668             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
       
   669             subscription.request(1);
       
   670         }
       
   671 
       
   672         @Override
       
   673         public void onNext(ByteBuffer item) {
       
   674             debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
       
   675             int size = outgoing.size();
       
   676             assert size == 0 : "non-zero size: " + size;
       
   677             onNextImpl(item);
       
   678         }
       
   679 
       
   680         private void onNextImpl(ByteBuffer item) {
       
   681             // Got some more request body bytes to send.
       
   682             if (requestBodyCF.isDone()) {
       
   683                 // stream already cancelled, probably in timeout
       
   684                 sendScheduler.stop();
       
   685                 subscription.cancel();
       
   686                 return;
       
   687             }
       
   688             outgoing.add(item);
       
   689             sendScheduler.runOrSchedule();
       
   690         }
       
   691 
       
   692         @Override
       
   693         public void onError(Throwable throwable) {
       
   694             debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
       
   695             // ensure that errors are handled within the flow.
       
   696             if (errorRef.compareAndSet(null, throwable)) {
       
   697                 sendScheduler.runOrSchedule();
       
   698             }
       
   699         }
       
   700 
       
   701         @Override
       
   702         public void onComplete() {
       
   703             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
       
   704             int size = outgoing.size();
       
   705             assert size == 0 || size == 1 : "non-zero or one size: " + size;
       
   706             // last byte of request body has been obtained.
       
   707             // ensure that everything is completed within the flow.
       
   708             onNextImpl(COMPLETED);
       
   709         }
       
   710 
       
   711         // Attempts to send the data, if any.
       
   712         // Handles errors and completion state.
       
   713         // Pause writing if the send window is exhausted, resume it if the
       
   714         // send window has some bytes that can be acquired.
       
   715         void trySend() {
       
   716             try {
       
   717                 // handle errors raised by onError;
       
   718                 Throwable t = errorRef.get();
       
   719                 if (t != null) {
       
   720                     sendScheduler.stop();
       
   721                     if (requestBodyCF.isDone()) return;
       
   722                     subscription.cancel();
       
   723                     requestBodyCF.completeExceptionally(t);
       
   724                     return;
       
   725                 }
       
   726 
       
   727                 do {
       
   728                     // handle COMPLETED;
       
   729                     ByteBuffer item = outgoing.peekFirst();
       
   730                     if (item == null) return;
       
   731                     else if (item == COMPLETED) {
       
   732                         sendScheduler.stop();
       
   733                         complete();
       
   734                         return;
       
   735                     }
       
   736 
       
   737                     // handle bytes to send downstream
       
   738                     while (item.hasRemaining()) {
       
   739                         debug.log(Level.DEBUG, "trySend: %d", item.remaining());
       
   740                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
       
   741                         DataFrame df = getDataFrame(item);
       
   742                         if (df == null) {
       
   743                             debug.log(Level.DEBUG, "trySend: can't send yet: %d",
       
   744                                     item.remaining());
       
   745                             return; // the send window is exhausted: come back later
       
   746                         }
       
   747 
       
   748                         if (contentLength > 0) {
       
   749                             remainingContentLength -= df.getDataLength();
       
   750                             if (remainingContentLength < 0) {
       
   751                                 String msg = connection().getConnectionFlow()
       
   752                                         + " stream=" + streamid + " "
       
   753                                         + "[" + Thread.currentThread().getName() + "] "
       
   754                                         + "Too many bytes in request body. Expected: "
       
   755                                         + contentLength + ", got: "
       
   756                                         + (contentLength - remainingContentLength);
       
   757                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   758                                 throw new IOException(msg);
       
   759                             } else if (remainingContentLength == 0) {
       
   760                                 df.setFlag(DataFrame.END_STREAM);
       
   761                                 endStreamSent = true;
       
   762                             }
       
   763                         }
       
   764                         debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
       
   765                         connection.sendDataFrame(df);
       
   766                     }
       
   767                     assert !item.hasRemaining();
       
   768                     ByteBuffer b = outgoing.removeFirst();
       
   769                     assert b == item;
       
   770                 } while (outgoing.peekFirst() != null);
       
   771 
       
   772                 debug.log(Level.DEBUG, "trySend: request 1");
       
   773                 subscription.request(1);
       
   774             } catch (Throwable ex) {
       
   775                 debug.log(Level.DEBUG, "trySend: ", ex);
       
   776                 sendScheduler.stop();
       
   777                 subscription.cancel();
       
   778                 requestBodyCF.completeExceptionally(ex);
       
   779             }
       
   780         }
       
   781 
       
   782         private void complete() throws IOException {
       
   783             long remaining = remainingContentLength;
       
   784             long written = contentLength - remaining;
       
   785             if (remaining > 0) {
       
   786                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   787                 // let trySend() handle the exception
       
   788                 throw new IOException(connection().getConnectionFlow()
       
   789                                      + " stream=" + streamid + " "
       
   790                                      + "[" + Thread.currentThread().getName() +"] "
       
   791                                      + "Too few bytes returned by the publisher ("
       
   792                                               + written + "/"
       
   793                                               + contentLength + ")");
       
   794             }
       
   795             if (!endStreamSent) {
       
   796                 endStreamSent = true;
       
   797                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
       
   798             }
       
   799             requestBodyCF.complete(null);
       
   800         }
       
   801     }
       
   802 
       
   803     /**
       
   804      * Send a RESET frame to tell server to stop sending data on this stream
       
   805      */
       
   806     @Override
       
   807     public CompletableFuture<Void> ignoreBody() {
       
   808         try {
       
   809             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
       
   810             return MinimalFuture.completedFuture(null);
       
   811         } catch (Throwable e) {
       
   812             Log.logTrace("Error resetting stream {0}", e.toString());
       
   813             return MinimalFuture.failedFuture(e);
       
   814         }
       
   815     }
       
   816 
       
   817     DataFrame getDataFrame(ByteBuffer buffer) {
       
   818         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
       
   819         // blocks waiting for stream send window, if exhausted
       
   820         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
       
   821         if (actualAmount <= 0) return null;
       
   822         ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
       
   823         DataFrame df = new DataFrame(streamid, 0 , outBuf);
       
   824         return df;
       
   825     }
       
   826 
       
   827     private DataFrame getEmptyEndStreamDataFrame()  {
       
   828         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
       
   829     }
       
   830 
       
   831     /**
       
   832      * A List of responses relating to this stream. Normally there is only
       
   833      * one response, but intermediate responses like 100 are allowed
       
   834      * and must be passed up to higher level before continuing. Deals with races
       
   835      * such as if responses are returned before the CFs get created by
       
   836      * getResponseAsync()
       
   837      */
       
   838 
       
   839     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
       
   840 
       
   841     @Override
       
   842     CompletableFuture<Response> getResponseAsync(Executor executor) {
       
   843         CompletableFuture<Response> cf;
       
   844         // The code below deals with race condition that can be caused when
       
   845         // completeResponse() is being called before getResponseAsync()
       
   846         synchronized (response_cfs) {
       
   847             if (!response_cfs.isEmpty()) {
       
   848                 // This CompletableFuture was created by completeResponse().
       
   849                 // it will be already completed.
       
   850                 cf = response_cfs.remove(0);
       
   851                 // if we find a cf here it should be already completed.
       
   852                 // finding a non completed cf should not happen. just assert it.
       
   853                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
       
   854             } else {
       
   855                 // getResponseAsync() is called first. Create a CompletableFuture
       
   856                 // that will be completed by completeResponse() when
       
   857                 // completeResponse() is called.
       
   858                 cf = new MinimalFuture<>();
       
   859                 response_cfs.add(cf);
       
   860             }
       
   861         }
       
   862         if (executor != null && !cf.isDone()) {
       
   863             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
       
   864             cf = cf.thenApplyAsync(r -> r, executor);
       
   865         }
       
   866         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
       
   867         PushGroup<?> pg = exchange.getPushGroup();
       
   868         if (pg != null) {
       
   869             // if an error occurs make sure it is recorded in the PushGroup
       
   870             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
       
   871         }
       
   872         return cf;
       
   873     }
       
   874 
       
   875     /**
       
   876      * Completes the first uncompleted CF on list, and removes it. If there is no
       
   877      * uncompleted CF then creates one (completes it) and adds to list
       
   878      */
       
   879     void completeResponse(Response resp) {
       
   880         synchronized (response_cfs) {
       
   881             CompletableFuture<Response> cf;
       
   882             int cfs_len = response_cfs.size();
       
   883             for (int i=0; i<cfs_len; i++) {
       
   884                 cf = response_cfs.get(i);
       
   885                 if (!cf.isDone()) {
       
   886                     Log.logTrace("Completing response (streamid={0}): {1}",
       
   887                                  streamid, cf);
       
   888                     cf.complete(resp);
       
   889                     response_cfs.remove(cf);
       
   890                     return;
       
   891                 } // else we found the previous response: just leave it alone.
       
   892             }
       
   893             cf = MinimalFuture.completedFuture(resp);
       
   894             Log.logTrace("Created completed future (streamid={0}): {1}",
       
   895                          streamid, cf);
       
   896             response_cfs.add(cf);
       
   897         }
       
   898     }
       
   899 
       
   900     // methods to update state and remove stream when finished
       
   901 
       
   902     synchronized void requestSent() {
       
   903         requestSent = true;
       
   904         if (responseReceived) {
       
   905             close();
       
   906         }
       
   907     }
       
   908 
       
   909     synchronized void responseReceived() {
       
   910         responseReceived = true;
       
   911         if (requestSent) {
       
   912             close();
       
   913         }
       
   914     }
       
   915 
       
   916     /**
       
   917      * same as above but for errors
       
   918      */
       
   919     void completeResponseExceptionally(Throwable t) {
       
   920         synchronized (response_cfs) {
       
   921             // use index to avoid ConcurrentModificationException
       
   922             // caused by removing the CF from within the loop.
       
   923             for (int i = 0; i < response_cfs.size(); i++) {
       
   924                 CompletableFuture<Response> cf = response_cfs.get(i);
       
   925                 if (!cf.isDone()) {
       
   926                     cf.completeExceptionally(t);
       
   927                     response_cfs.remove(i);
       
   928                     return;
       
   929                 }
       
   930             }
       
   931             response_cfs.add(MinimalFuture.failedFuture(t));
       
   932         }
       
   933     }
       
   934 
       
   935     CompletableFuture<Void> sendBodyImpl() {
       
   936         requestBodyCF.whenComplete((v, t) -> requestSent());
       
   937         if (requestPublisher != null) {
       
   938             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
       
   939             requestPublisher.subscribe(requestSubscriber = subscriber);
       
   940         } else {
       
   941             // there is no request body, therefore the request is complete,
       
   942             // END_STREAM has already sent with outgoing headers
       
   943             requestBodyCF.complete(null);
       
   944         }
       
   945         return requestBodyCF;
       
   946     }
       
   947 
       
   948     @Override
       
   949     void cancel() {
       
   950         cancel(new IOException("Stream " + streamid + " cancelled"));
       
   951     }
       
   952 
       
   953     @Override
       
   954     void cancel(IOException cause) {
       
   955         cancelImpl(cause);
       
   956     }
       
   957 
       
   958     // This method sends a RST_STREAM frame
       
   959     void cancelImpl(Throwable e) {
       
   960         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
       
   961         if (Log.trace()) {
       
   962             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
       
   963         }
       
   964         boolean closing;
       
   965         if (closing = !closed) { // assigning closing to !closed
       
   966             synchronized (this) {
       
   967                 failed = e;
       
   968                 if (closing = !closed) { // assigning closing to !closed
       
   969                     closed=true;
       
   970                 }
       
   971             }
       
   972         }
       
   973         if (closing) { // true if the stream has not been closed yet
       
   974             if (responseSubscriber != null)
       
   975                 sched.runOrSchedule();
       
   976         }
       
   977         completeResponseExceptionally(e);
       
   978         if (!requestBodyCF.isDone()) {
       
   979             requestBodyCF.completeExceptionally(e); // we may be sending the body..
       
   980         }
       
   981         if (responseBodyCF != null) {
       
   982             responseBodyCF.completeExceptionally(e);
       
   983         }
       
   984         try {
       
   985             // will send a RST_STREAM frame
       
   986             if (streamid != 0) {
       
   987                 connection.resetStream(streamid, ResetFrame.CANCEL);
       
   988             }
       
   989         } catch (IOException ex) {
       
   990             Log.logError(ex);
       
   991         }
       
   992     }
       
   993 
       
   994     // This method doesn't send any frame
       
   995     void close() {
       
   996         if (closed) return;
       
   997         synchronized(this) {
       
   998             if (closed) return;
       
   999             closed = true;
       
  1000         }
       
  1001         Log.logTrace("Closing stream {0}", streamid);
       
  1002         connection.closeStream(streamid);
       
  1003         Log.logTrace("Stream {0} closed", streamid);
       
  1004     }
       
  1005 
       
  1006     static class PushedStream<T> extends Stream<T> {
       
  1007         final PushGroup<T> pushGroup;
       
  1008         // push streams need the response CF allocated up front as it is
       
  1009         // given directly to user via the multi handler callback function.
       
  1010         final CompletableFuture<Response> pushCF;
       
  1011         CompletableFuture<HttpResponse<T>> responseCF;
       
  1012         final HttpRequestImpl pushReq;
       
  1013         HttpResponse.BodyHandler<T> pushHandler;
       
  1014 
       
  1015         PushedStream(PushGroup<T> pushGroup,
       
  1016                      Http2Connection connection,
       
  1017                      Exchange<T> pushReq) {
       
  1018             // ## no request body possible, null window controller
       
  1019             super(connection, pushReq, null);
       
  1020             this.pushGroup = pushGroup;
       
  1021             this.pushReq = pushReq.request();
       
  1022             this.pushCF = new MinimalFuture<>();
       
  1023             this.responseCF = new MinimalFuture<>();
       
  1024 
       
  1025         }
       
  1026 
       
  1027         CompletableFuture<HttpResponse<T>> responseCF() {
       
  1028             return responseCF;
       
  1029         }
       
  1030 
       
  1031         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
       
  1032             this.pushHandler = pushHandler;
       
  1033         }
       
  1034 
       
  1035         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
       
  1036             // ignored parameters to function can be used as BodyHandler
       
  1037             return this.pushHandler;
       
  1038         }
       
  1039 
       
  1040         // Following methods call the super class but in case of
       
  1041         // error record it in the PushGroup. The error method is called
       
  1042         // with a null value when no error occurred (is a no-op)
       
  1043         @Override
       
  1044         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
  1045             return super.sendBodyAsync()
       
  1046                         .whenComplete((ExchangeImpl<T> v, Throwable t)
       
  1047                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1048         }
       
  1049 
       
  1050         @Override
       
  1051         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
  1052             return super.sendHeadersAsync()
       
  1053                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
       
  1054                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1055         }
       
  1056 
       
  1057         @Override
       
  1058         CompletableFuture<Response> getResponseAsync(Executor executor) {
       
  1059             CompletableFuture<Response> cf = pushCF.whenComplete(
       
  1060                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
       
  1061             if(executor!=null && !cf.isDone()) {
       
  1062                 cf  = cf.thenApplyAsync( r -> r, executor);
       
  1063             }
       
  1064             return cf;
       
  1065         }
       
  1066 
       
  1067         @Override
       
  1068         CompletableFuture<T> readBodyAsync(
       
  1069                 HttpResponse.BodyHandler<T> handler,
       
  1070                 boolean returnConnectionToPool,
       
  1071                 Executor executor)
       
  1072         {
       
  1073             return super.readBodyAsync(handler, returnConnectionToPool, executor)
       
  1074                         .whenComplete((v, t) -> pushGroup.pushError(t));
       
  1075         }
       
  1076 
       
  1077         @Override
       
  1078         void completeResponse(Response r) {
       
  1079             Log.logResponse(r::toString);
       
  1080             pushCF.complete(r); // not strictly required for push API
       
  1081             // start reading the body using the obtained BodySubscriber
       
  1082             CompletableFuture<Void> start = new MinimalFuture<>();
       
  1083             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
       
  1084                 .whenComplete((T body, Throwable t) -> {
       
  1085                     if (t != null) {
       
  1086                         responseCF.completeExceptionally(t);
       
  1087                     } else {
       
  1088                         HttpResponseImpl<T> resp =
       
  1089                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
       
  1090                         responseCF.complete(resp);
       
  1091                     }
       
  1092                 });
       
  1093             start.completeAsync(() -> null, getExchange().executor());
       
  1094         }
       
  1095 
       
  1096         @Override
       
  1097         void completeResponseExceptionally(Throwable t) {
       
  1098             pushCF.completeExceptionally(t);
       
  1099         }
       
  1100 
       
  1101 //        @Override
       
  1102 //        synchronized void responseReceived() {
       
  1103 //            super.responseReceived();
       
  1104 //        }
       
  1105 
       
  1106         // create and return the PushResponseImpl
       
  1107         @Override
       
  1108         protected void handleResponse() {
       
  1109             responseCode = (int)responseHeaders
       
  1110                 .firstValueAsLong(":status")
       
  1111                 .orElse(-1);
       
  1112 
       
  1113             if (responseCode == -1) {
       
  1114                 completeResponseExceptionally(new IOException("No status code"));
       
  1115             }
       
  1116 
       
  1117             this.response = new Response(
       
  1118                 pushReq, exchange, responseHeaders,
       
  1119                 responseCode, HttpClient.Version.HTTP_2);
       
  1120 
       
  1121             /* TODO: review if needs to be removed
       
  1122                the value is not used, but in case `content-length` doesn't parse
       
  1123                as long, there will be NumberFormatException. If left as is, make
       
  1124                sure code up the stack handles NFE correctly. */
       
  1125             responseHeaders.firstValueAsLong("content-length");
       
  1126 
       
  1127             if (Log.headers()) {
       
  1128                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
       
  1129                 sb.append(" (streamid=").append(streamid).append("): ");
       
  1130                 Log.dumpHeaders(sb, "    ", responseHeaders);
       
  1131                 Log.logHeaders(sb.toString());
       
  1132             }
       
  1133 
       
  1134             // different implementations for normal streams and pushed streams
       
  1135             completeResponse(response);
       
  1136         }
       
  1137     }
       
  1138 
       
  1139     final class StreamWindowUpdateSender extends WindowUpdateSender {
       
  1140 
       
  1141         StreamWindowUpdateSender(Http2Connection connection) {
       
  1142             super(connection);
       
  1143         }
       
  1144 
       
  1145         @Override
       
  1146         int getStreamId() {
       
  1147             return streamid;
       
  1148         }
       
  1149     }
       
  1150 
       
  1151     /**
       
  1152      * Returns true if this exchange was canceled.
       
  1153      * @return true if this exchange was canceled.
       
  1154      */
       
  1155     synchronized boolean isCanceled() {
       
  1156         return failed != null;
       
  1157     }
       
  1158 
       
  1159     /**
       
  1160      * Returns the cause for which this exchange was canceled, if available.
       
  1161      * @return the cause for which this exchange was canceled, if available.
       
  1162      */
       
  1163     synchronized Throwable getCancelCause() {
       
  1164         return failed;
       
  1165     }
       
  1166 
       
  1167     final String dbgString() {
       
  1168         return connection.dbgString() + "/Stream("+streamid+")";
       
  1169     }
       
  1170 }