src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55768 8674257c75ce
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
    29 import java.net.URI;
    30 import java.net.URI;
    30 import java.nio.ByteBuffer;
    31 import java.nio.ByteBuffer;
    31 import java.util.ArrayList;
    32 import java.util.ArrayList;
       
    33 import java.util.Arrays;
       
    34 import java.util.Collections;
    32 import java.util.List;
    35 import java.util.List;
    33 import java.util.Optional;
    36 import java.util.Optional;
    34 import java.util.concurrent.CompletableFuture;
    37 import java.util.concurrent.CompletableFuture;
    35 import java.util.concurrent.CompletionException;
    38 import java.util.concurrent.ConcurrentLinkedQueue;
    36 import java.util.concurrent.ExecutionException;
       
    37 import java.util.concurrent.Executor;
    39 import java.util.concurrent.Executor;
    38 import java.util.concurrent.Flow;
    40 import java.util.concurrent.Flow;
    39 import java.util.concurrent.Flow.Subscription;
    41 import java.util.concurrent.Flow.Subscription;
    40 import java.util.concurrent.TimeUnit;
    42 import java.util.concurrent.atomic.AtomicReference;
    41 import java.util.concurrent.TimeoutException;
    43 import java.util.stream.Collectors;
    42 import java.util.function.Consumer;
       
    43 
    44 
    44 import jdk.incubator.http.internal.common.*;
    45 import jdk.incubator.http.internal.common.*;
       
    46 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    47 import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
    45 import jdk.incubator.http.internal.frame.*;
    48 import jdk.incubator.http.internal.frame.*;
    46 import jdk.incubator.http.internal.hpack.DecodingCallback;
    49 import jdk.incubator.http.internal.hpack.DecodingCallback;
       
    50 import static java.util.stream.Collectors.toList;
    47 
    51 
    48 /**
    52 /**
    49  * Http/2 Stream handling.
    53  * Http/2 Stream handling.
    50  *
    54  *
    51  * REQUESTS
    55  * REQUESTS
    52  *
    56  *
    53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
    57  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
    54  *
    58  *
    55  * sendRequest() -- sendHeadersOnly() + sendBody()
    59  * sendRequest() -- sendHeadersOnly() + sendBody()
    56  *
       
    57  * sendBody() -- in calling thread: obeys all flow control (so may block)
       
    58  *               obtains data from request body processor and places on connection
       
    59  *               outbound Q.
       
    60  *
    60  *
    61  * sendBodyAsync() -- calls sendBody() in an executor thread.
    61  * sendBodyAsync() -- calls sendBody() in an executor thread.
    62  *
    62  *
    63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
    63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
    64  *
    64  *
    74  *               if one exists. Otherwise, creates one and adds it to list
    74  *               if one exists. Otherwise, creates one and adds it to list
    75  *               and returns it. Completion is achieved through the
    75  *               and returns it. Completion is achieved through the
    76  *               incoming() upcall from connection reader thread.
    76  *               incoming() upcall from connection reader thread.
    77  *
    77  *
    78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
    78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
    79  *
       
    80  * responseBody() -- in calling thread: blocks for incoming DATA frames on
       
    81  *               stream inputQ. Obeys remote and local flow control so may block.
       
    82  *               Calls user response body processor with data buffers.
       
    83  *
    79  *
    84  * responseBodyAsync() -- calls responseBody() in an executor thread.
    80  * responseBodyAsync() -- calls responseBody() in an executor thread.
    85  *
    81  *
    86  * incoming() -- entry point called from connection reader thread. Frames are
    82  * incoming() -- entry point called from connection reader thread. Frames are
    87  *               either handled immediately without blocking or for data frames
    83  *               either handled immediately without blocking or for data frames
    96  * one response. The CF is created when the object created and when the response
    92  * one response. The CF is created when the object created and when the response
    97  * HEADERS frame is received the object is completed.
    93  * HEADERS frame is received the object is completed.
    98  */
    94  */
    99 class Stream<T> extends ExchangeImpl<T> {
    95 class Stream<T> extends ExchangeImpl<T> {
   100 
    96 
   101     final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
    97     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
       
    98     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    99 
       
   100     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
       
   101     final SequentialScheduler sched =
       
   102             new SequentialScheduler(new SynchronizedRestartableTask(this::schedule));
       
   103     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
   102 
   104 
   103     /**
   105     /**
   104      * This stream's identifier. Assigned lazily by the HTTP2Connection before
   106      * This stream's identifier. Assigned lazily by the HTTP2Connection before
   105      * the stream's first frame is sent.
   107      * the stream's first frame is sent.
   106      */
   108      */
   115     final HttpRequestImpl request;
   117     final HttpRequestImpl request;
   116     final DecodingCallback rspHeadersConsumer;
   118     final DecodingCallback rspHeadersConsumer;
   117     HttpHeadersImpl responseHeaders;
   119     HttpHeadersImpl responseHeaders;
   118     final HttpHeadersImpl requestHeaders;
   120     final HttpHeadersImpl requestHeaders;
   119     final HttpHeadersImpl requestPseudoHeaders;
   121     final HttpHeadersImpl requestPseudoHeaders;
   120     HttpResponse.BodyProcessor<T> responseProcessor;
   122     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
   121     final HttpRequest.BodyProcessor requestProcessor;
   123     final HttpRequest.BodyPublisher requestPublisher;
       
   124     volatile RequestSubscriber requestSubscriber;
   122     volatile int responseCode;
   125     volatile int responseCode;
   123     volatile Response response;
   126     volatile Response response;
   124     volatile CompletableFuture<Response> responseCF;
   127     volatile CompletableFuture<Response> responseCF;
   125     final AbstractPushPublisher<ByteBuffer> publisher;
   128     volatile Throwable failed; // The exception with which this stream was canceled.
   126     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
   129     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
       
   130     volatile CompletableFuture<T> responseBodyCF;
   127 
   131 
   128     /** True if END_STREAM has been seen in a frame received on this stream. */
   132     /** True if END_STREAM has been seen in a frame received on this stream. */
   129     private volatile boolean remotelyClosed;
   133     private volatile boolean remotelyClosed;
   130     private volatile boolean closed;
   134     private volatile boolean closed;
   131     private volatile boolean endStreamSent;
   135     private volatile boolean endStreamSent;
   144     @Override
   148     @Override
   145     HttpConnection connection() {
   149     HttpConnection connection() {
   146         return connection.connection;
   150         return connection.connection;
   147     }
   151     }
   148 
   152 
       
   153     /**
       
   154      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
       
   155      * of after user subscription window has re-opened, from SubscriptionBase.request()
       
   156      */
       
   157     private void schedule() {
       
   158         if (responseSubscriber == null)
       
   159             // can't process anything yet
       
   160             return;
       
   161 
       
   162         while (!inputQ.isEmpty()) {
       
   163             Http2Frame frame  = inputQ.peek();
       
   164             if (frame instanceof ResetFrame) {
       
   165                 inputQ.remove();
       
   166                 handleReset((ResetFrame)frame);
       
   167                 return;
       
   168             }
       
   169             DataFrame df = (DataFrame)frame;
       
   170             boolean finished = df.getFlag(DataFrame.END_STREAM);
       
   171 
       
   172             ByteBufferReference[] buffers = df.getData();
       
   173             List<ByteBuffer> dsts = Arrays.stream(buffers)
       
   174                 .map(ByteBufferReference::get)
       
   175                 .filter(ByteBuffer::hasRemaining)
       
   176                 .collect(Collectors.collectingAndThen(toList(), Collections::unmodifiableList));
       
   177             int size = (int)Utils.remaining(dsts);
       
   178             if (size == 0 && finished) {
       
   179                 inputQ.remove();
       
   180                 Log.logTrace("responseSubscriber.onComplete");
       
   181                 debug.log(Level.DEBUG, "incoming: onComplete");
       
   182                 sched.stop();
       
   183                 responseSubscriber.onComplete();
       
   184                 setEndStreamReceived();
       
   185                 return;
       
   186             } else if (userSubscription.tryDecrement()) {
       
   187                 inputQ.remove();
       
   188                 Log.logTrace("responseSubscriber.onNext {0}", size);
       
   189                 debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
       
   190                 responseSubscriber.onNext(dsts);
       
   191                 if (consumed(df)) {
       
   192                     Log.logTrace("responseSubscriber.onComplete");
       
   193                     debug.log(Level.DEBUG, "incoming: onComplete");
       
   194                     sched.stop();
       
   195                     responseSubscriber.onComplete();
       
   196                     setEndStreamReceived();
       
   197                     return;
       
   198                 }
       
   199             } else {
       
   200                 return;
       
   201             }
       
   202         }
       
   203         Throwable t = failed;
       
   204         if (t != null) {
       
   205             sched.stop();
       
   206             responseSubscriber.onError(t);
       
   207             close();
       
   208         }
       
   209     }
       
   210 
       
   211     // Callback invoked after the Response BodyProcessor has consumed the
       
   212     // buffers contained in a DataFrame.
       
   213     // Returns true if END_STREAM is reached, false otherwise.
       
   214     private boolean consumed(DataFrame df) {
       
   215         // RFC 7540 6.1:
       
   216         // The entire DATA frame payload is included in flow control,
       
   217         // including the Pad Length and Padding fields if present
       
   218         int len = df.payloadLength();
       
   219         connection.windowUpdater.update(len);
       
   220 
       
   221         if (!df.getFlag(DataFrame.END_STREAM)) {
       
   222             // Don't send window update on a stream which is
       
   223             // closed or half closed.
       
   224             windowUpdater.update(len);
       
   225             return false; // more data coming
       
   226         }
       
   227         return true; // end of stream
       
   228     }
       
   229 
   149     @Override
   230     @Override
   150     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   231     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   151                                        boolean returnConnectionToPool,
   232                                        boolean returnConnectionToPool,
   152                                        Executor executor)
   233                                        Executor executor)
   153     {
   234     {
   154         Log.logTrace("Reading body on stream {0}", streamid);
   235         Log.logTrace("Reading body on stream {0}", streamid);
   155         responseProcessor = handler.apply(responseCode, responseHeaders);
   236         responseSubscriber = handler.apply(responseCode, responseHeaders);
   156         publisher.subscribe(responseProcessor);
   237         CompletableFuture<T> cf = receiveData();
   157         CompletableFuture<T> cf = receiveData(executor);
       
   158 
   238 
   159         PushGroup<?,?> pg = exchange.getPushGroup();
   239         PushGroup<?,?> pg = exchange.getPushGroup();
   160         if (pg != null) {
   240         if (pg != null) {
   161             // if an error occurs make sure it is recorded in the PushGroup
   241             // if an error occurs make sure it is recorded in the PushGroup
   162             cf = cf.whenComplete((t,e) -> pg.pushError(e));
   242             cf = cf.whenComplete((t,e) -> pg.pushError(e));
   163         }
   243         }
   164         return cf;
   244         return cf;
   165     }
       
   166 
       
   167     @Override
       
   168     T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
       
   169         throws IOException
       
   170     {
       
   171         CompletableFuture<T> cf = readBodyAsync(handler,
       
   172                                                 returnConnectionToPool,
       
   173                                                 null);
       
   174         try {
       
   175             return cf.join();
       
   176         } catch (CompletionException e) {
       
   177             throw Utils.getIOException(e);
       
   178         }
       
   179     }
   245     }
   180 
   246 
   181     @Override
   247     @Override
   182     public String toString() {
   248     public String toString() {
   183         StringBuilder sb = new StringBuilder();
   249         StringBuilder sb = new StringBuilder();
   184         sb.append("streamid: ")
   250         sb.append("streamid: ")
   185                 .append(streamid);
   251                 .append(streamid);
   186         return sb.toString();
   252         return sb.toString();
   187     }
   253     }
   188 
   254 
   189     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
   255     private void receiveDataFrame(DataFrame df) {
   190         if (frame instanceof ResetFrame) {
   256         inputQ.add(df);
   191             handleReset((ResetFrame) frame);
   257         sched.runOrSchedule();
   192             return true;
   258     }
   193         } else if (!(frame instanceof DataFrame)) {
   259 
   194             assert false;
   260     /**
   195             return true;
   261      * RESET always handled inline in queue
   196         }
   262      */
   197         DataFrame df = (DataFrame) frame;
   263     private void receiveResetFrame(ResetFrame frame) {
   198         // RFC 7540 6.1:
   264         inputQ.add(frame);
   199         // The entire DATA frame payload is included in flow control,
   265         sched.runOrSchedule();
   200         // including the Pad Length and Padding fields if present
   266     }
   201         int len = df.payloadLength();
   267 
   202         ByteBufferReference[] buffers = df.getData();
   268     // pushes entire response body into response subscriber
   203         for (ByteBufferReference b : buffers) {
       
   204             ByteBuffer buf = b.get();
       
   205             if (buf.hasRemaining()) {
       
   206                 publisher.acceptData(Optional.of(buf));
       
   207             }
       
   208         }
       
   209         connection.windowUpdater.update(len);
       
   210         if (df.getFlag(DataFrame.END_STREAM)) {
       
   211             setEndStreamReceived();
       
   212             publisher.acceptData(Optional.empty());
       
   213             return false;
       
   214         }
       
   215         // Don't send window update on a stream which is
       
   216         // closed or half closed.
       
   217         windowUpdater.update(len);
       
   218         return true;
       
   219     }
       
   220 
       
   221     // pushes entire response body into response processor
       
   222     // blocking when required by local or remote flow control
   269     // blocking when required by local or remote flow control
   223     CompletableFuture<T> receiveData(Executor executor) {
   270     CompletableFuture<T> receiveData() {
   224         CompletableFuture<T> cf = responseProcessor
   271         responseBodyCF = responseSubscriber
   225                 .getBody()
   272                 .getBody()
   226                 .toCompletableFuture();
   273                 .toCompletableFuture();
   227         Consumer<Throwable> onError = e -> {
   274 
   228             Log.logTrace("receiveData: {0}", e.toString());
   275         if (isCanceled()) {
   229             e.printStackTrace();
   276             Throwable t = getCancelCause();
   230             cf.completeExceptionally(e);
   277             responseBodyCF.completeExceptionally(t);
   231             publisher.acceptError(e);
   278             sched.runOrSchedule();
   232         };
       
   233         if (executor == null) {
       
   234             inputQ.blockingReceive(this::receiveDataFrame, onError);
       
   235         } else {
   279         } else {
   236             inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
   280             responseSubscriber.onSubscribe(userSubscription);
   237         }
   281             sched.runOrSchedule(); // in case data waiting already to be processed
   238         return cf;
   282         }
       
   283         return responseBodyCF;
   239     }
   284     }
   240 
   285 
   241     @Override
   286     @Override
   242     void sendBody() throws IOException {
       
   243         try {
       
   244             sendBodyImpl().join();
       
   245         } catch (CompletionException e) {
       
   246             throw Utils.getIOException(e);
       
   247         }
       
   248     }
       
   249 
       
   250     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
   287     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
   251         return sendBodyImpl().thenApply( v -> this);
   288         return sendBodyImpl().thenApply( v -> this);
   252     }
   289     }
   253 
   290 
   254     @SuppressWarnings("unchecked")
   291     @SuppressWarnings("unchecked")
   260         super(e);
   297         super(e);
   261         this.client = client;
   298         this.client = client;
   262         this.connection = connection;
   299         this.connection = connection;
   263         this.windowController = windowController;
   300         this.windowController = windowController;
   264         this.request = e.request();
   301         this.request = e.request();
   265         this.requestProcessor = request.requestProcessor;
   302         this.requestPublisher = request.requestPublisher;  // may be null
   266         responseHeaders = new HttpHeadersImpl();
   303         responseHeaders = new HttpHeadersImpl();
   267         requestHeaders = new HttpHeadersImpl();
   304         requestHeaders = new HttpHeadersImpl();
   268         rspHeadersConsumer = (name, value) -> {
   305         rspHeadersConsumer = (name, value) -> {
   269             responseHeaders.addHeader(name.toString(), value.toString());
   306             responseHeaders.addHeader(name.toString(), value.toString());
   270             if (Log.headers() && Log.trace()) {
   307             if (Log.headers() && Log.trace()) {
   272                              streamid, name, value);
   309                              streamid, name, value);
   273             }
   310             }
   274         };
   311         };
   275         this.requestPseudoHeaders = new HttpHeadersImpl();
   312         this.requestPseudoHeaders = new HttpHeadersImpl();
   276         // NEW
   313         // NEW
   277         this.publisher = new BlockingPushPublisher<>();
       
   278         this.windowUpdater = new StreamWindowUpdateSender(connection);
   314         this.windowUpdater = new StreamWindowUpdateSender(connection);
   279     }
   315     }
   280 
   316 
   281     /**
   317     /**
   282      * Entry point from Http2Connection reader thread.
   318      * Entry point from Http2Connection reader thread.
   283      *
   319      *
   284      * Data frames will be removed by response body thread.
   320      * Data frames will be removed by response body thread.
   285      */
   321      */
   286     void incoming(Http2Frame frame) throws IOException {
   322     void incoming(Http2Frame frame) throws IOException {
       
   323         debug.log(Level.DEBUG, "incoming: %s", frame);
   287         if ((frame instanceof HeaderFrame)) {
   324         if ((frame instanceof HeaderFrame)) {
   288             HeaderFrame hframe = (HeaderFrame)frame;
   325             HeaderFrame hframe = (HeaderFrame)frame;
   289             if (hframe.endHeaders()) {
   326             if (hframe.endHeaders()) {
   290                 Log.logTrace("handling response (streamid={0})", streamid);
   327                 Log.logTrace("handling response (streamid={0})", streamid);
   291                 handleResponse();
   328                 handleResponse();
   292                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
   329                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
   293                     inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
   330                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
   294                 }
   331                 }
   295             }
   332             }
   296         } else if (frame instanceof DataFrame) {
   333         } else if (frame instanceof DataFrame) {
   297             inputQ.put(frame);
   334             receiveDataFrame((DataFrame)frame);
   298         } else {
   335         } else {
   299             otherFrame(frame);
   336             otherFrame(frame);
   300         }
   337         }
   301     }
   338     }
   302 
   339 
   347         }
   384         }
   348 
   385 
   349         completeResponse(response);
   386         completeResponse(response);
   350     }
   387     }
   351 
   388 
   352     void incoming_reset(ResetFrame frame) throws IOException {
   389     void incoming_reset(ResetFrame frame) {
   353         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
   390         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
   354         if (endStreamReceived()) {
   391         if (endStreamReceived()) {
   355             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
   392             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
   356         } else if (closed) {
   393         } else if (closed) {
   357             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   394             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   358         } else {
   395         } else {
   359             boolean pushedToQueue = false;
   396             // put it in the input queue in order to read all
   360             synchronized(this) {
   397             // pending data frames first. Indeed, a server may send
   361                 // if the response headers are not yet
   398             // RST_STREAM after sending END_STREAM, in which case we should
   362                 // received, or the inputQueue is closed, handle reset directly.
   399             // ignore it. However, we won't know if we have received END_STREAM
   363                 // Otherwise, put it in the input queue in order to read all
   400             // or not until all pending data frames are read.
   364                 // pending data frames first. Indeed, a server may send
   401             receiveResetFrame(frame);
   365                 // RST_STREAM after sending END_STREAM, in which case we should
   402             // RST_STREAM was pushed to the queue. It will be handled by
   366                 // ignore it. However, we won't know if we have received END_STREAM
   403             // asyncReceive after all pending data frames have been
   367                 // or not until all pending data frames are read.
   404             // processed.
   368                 // Because the inputQ will not be read until the response
   405             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
   369                 // headers are received, and because response headers won't be
   406         }
   370                 // sent if the server sent RST_STREAM, then we must handle
   407     }
   371                 // reset here directly unless responseHeadersReceived is true.
   408 
   372                 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
   409     void handleReset(ResetFrame frame) {
   373             }
       
   374             if (!pushedToQueue) {
       
   375                 // RST_STREAM was not pushed to the queue: handle it.
       
   376                 try {
       
   377                     handleReset(frame);
       
   378                 } catch (IOException io) {
       
   379                     completeResponseExceptionally(io);
       
   380                 }
       
   381             } else {
       
   382                 // RST_STREAM was pushed to the queue. It will be handled by
       
   383                 // asyncReceive after all pending data frames have been
       
   384                 // processed.
       
   385                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
       
   386             }
       
   387         }
       
   388     }
       
   389 
       
   390     void handleReset(ResetFrame frame) throws IOException {
       
   391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
   410         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
   392         if (!closed) {
   411         if (!closed) {
   393             close();
   412             close();
   394             int error = frame.getErrorCode();
   413             int error = frame.getErrorCode();
   395             throw new IOException(ErrorFrame.stringForCode(error));
   414             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
   396         } else {
   415         } else {
   397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   416             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   398         }
   417         }
   399     }
   418     }
   400 
   419 
   429         }
   448         }
   430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
   449         PushGroup<?,T> pushGroup = exchange.getPushGroup();
   431         if (pushGroup == null || pushGroup.noMorePushes()) {
   450         if (pushGroup == null || pushGroup.noMorePushes()) {
   432             cancelImpl(new IllegalStateException("unexpected push promise"
   451             cancelImpl(new IllegalStateException("unexpected push promise"
   433                 + " on stream " + streamid));
   452                 + " on stream " + streamid));
   434         }
   453             return;
   435 
   454         }
   436         HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
   455 
       
   456         HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
   437 
   457 
   438         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
   458         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
   439 
   459 
   440         Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
   460         Optional<HttpResponse.BodyHandler<T>> bpOpt =
   441                 pushReq);
   461                 pushGroup.handlerForPushRequest(pushReq);
   442 
   462 
   443         if (!bpOpt.isPresent()) {
   463         if (!bpOpt.isPresent()) {
   444             IOException ex = new IOException("Stream "
   464             IOException ex = new IOException("Stream "
   445                  + streamid + " cancelled by user");
   465                  + streamid + " cancelled by user");
   446             if (Log.trace()) {
   466             if (Log.trace()) {
   447                 Log.logTrace("No body processor for {0}: {1}", pushReq,
   467                 Log.logTrace("No body subscriber for {0}: {1}", pushReq,
   448                             ex.getMessage());
   468                             ex.getMessage());
   449             }
   469             }
   450             pushStream.cancelImpl(ex);
   470             pushStream.cancelImpl(ex);
   451             cf.completeExceptionally(ex);
   471             cf.completeExceptionally(ex);
   452             return;
   472             return;
   456         pushStream.requestSent();
   476         pushStream.requestSent();
   457         pushStream.setPushHandler(bpOpt.get());
   477         pushStream.setPushHandler(bpOpt.get());
   458         // setup housekeeping for when the push is received
   478         // setup housekeeping for when the push is received
   459         // TODO: deal with ignoring of CF anti-pattern
   479         // TODO: deal with ignoring of CF anti-pattern
   460         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
   480         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
       
   481             t = Utils.getCompletionCause(t);
   461             if (Log.trace()) {
   482             if (Log.trace()) {
   462                 Log.logTrace("Push completed on stream {0} for {1}{2}",
   483                 Log.logTrace("Push completed on stream {0} for {1}{2}",
   463                              pushStream.streamid, resp,
   484                              pushStream.streamid, resp,
   464                              ((t==null) ? "": " with exception " + t));
   485                              ((t==null) ? "": " with exception " + t));
   465             }
   486             }
   514 
   535 
   515     HttpHeadersImpl getRequestPseudoHeaders() {
   536     HttpHeadersImpl getRequestPseudoHeaders() {
   516         return requestPseudoHeaders;
   537         return requestPseudoHeaders;
   517     }
   538     }
   518 
   539 
   519     @Override
       
   520     Response getResponse() throws IOException {
       
   521         try {
       
   522             if (request.duration() != null) {
       
   523                 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
       
   524                              streamid,
       
   525                              request.duration().toMillis());
       
   526                 return getResponseAsync(null).get(
       
   527                         request.duration().toMillis(), TimeUnit.MILLISECONDS);
       
   528             } else {
       
   529                 Log.logTrace("Waiting for response (streamid={0})", streamid);
       
   530                 return getResponseAsync(null).join();
       
   531             }
       
   532         } catch (TimeoutException e) {
       
   533             Log.logTrace("Response timeout (streamid={0})", streamid);
       
   534             throw new HttpTimeoutException("Response timed out");
       
   535         } catch (InterruptedException | ExecutionException | CompletionException e) {
       
   536             Throwable t = e.getCause();
       
   537             Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
       
   538             if (t instanceof IOException) {
       
   539                 throw (IOException)t;
       
   540             }
       
   541             throw new IOException(e);
       
   542         } finally {
       
   543             Log.logTrace("Got response or failed (streamid={0})", streamid);
       
   544         }
       
   545     }
       
   546 
       
   547     /** Sets endStreamReceived. Should be called only once. */
   540     /** Sets endStreamReceived. Should be called only once. */
   548     void setEndStreamReceived() {
   541     void setEndStreamReceived() {
   549         assert remotelyClosed == false: "Unexpected endStream already set";
   542         assert remotelyClosed == false: "Unexpected endStream already set";
   550         remotelyClosed = true;
   543         remotelyClosed = true;
   551         responseReceived();
   544         responseReceived();
   556     private boolean endStreamReceived() {
   549     private boolean endStreamReceived() {
   557         return remotelyClosed;
   550         return remotelyClosed;
   558     }
   551     }
   559 
   552 
   560     @Override
   553     @Override
   561     void sendHeadersOnly() throws IOException, InterruptedException {
   554     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
   555         debug.log(Level.DEBUG, "sendHeadersOnly()");
   562         if (Log.requests() && request != null) {
   556         if (Log.requests() && request != null) {
   563             Log.logRequest(request.toString());
   557             Log.logRequest(request.toString());
   564         }
   558         }
   565         requestContentLen = requestProcessor.contentLength();
   559         if (requestPublisher != null) {
       
   560             requestContentLen = requestPublisher.contentLength();
       
   561         } else {
       
   562             requestContentLen = 0;
       
   563         }
   566         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
   564         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
   567         connection.sendFrame(f);
   565         connection.sendFrame(f);
       
   566         CompletableFuture<ExchangeImpl<T>> cf = new CompletableFuture<ExchangeImpl<T>>();
       
   567         cf.complete(this);  // #### good enough for now
       
   568         return cf;
       
   569     }
       
   570 
       
   571     @Override
       
   572     void released() {
       
   573         if (streamid > 0) {
       
   574             debug.log(Level.DEBUG, "Released stream %d", streamid);
       
   575             // remove this stream from the Http2Connection map.
       
   576             connection.closeStream(streamid);
       
   577         } else {
       
   578             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
       
   579         }
       
   580     }
       
   581 
       
   582     @Override
       
   583     void completed() {
       
   584         // There should be nothing to do here: the stream should have
       
   585         // been already closed (or will be closed shortly after).
   568     }
   586     }
   569 
   587 
   570     void registerStream(int id) {
   588     void registerStream(int id) {
   571         this.streamid = id;
   589         this.streamid = id;
   572         connection.putStream(this, streamid);
   590         connection.putStream(this, streamid);
   573     }
   591         debug.log(Level.DEBUG, "Registered stream %d", id);
   574 
   592     }
       
   593 
       
   594     void signalWindowUpdate() {
       
   595         RequestSubscriber subscriber = requestSubscriber;
       
   596         assert subscriber != null;
       
   597         debug.log(Level.DEBUG, "Signalling window update");
       
   598         subscriber.sendScheduler.runOrSchedule();
       
   599     }
       
   600 
       
   601     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
   575     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
   602     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
   576         // can be < 0 if the actual length is not known.
   603         // can be < 0 if the actual length is not known.
       
   604         private final long contentLength;
   577         private volatile long remainingContentLength;
   605         private volatile long remainingContentLength;
   578         private volatile Subscription subscription;
   606         private volatile Subscription subscription;
       
   607         private volatile ByteBuffer current;
       
   608         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   609         // A scheduler used to honor window updates. Writing must be paused
       
   610         // when the window is exhausted, and resumed when the window acquires
       
   611         // some space. The sendScheduler makes it possible to implement this
       
   612         // behaviour in an asynchronous non-blocking way.
       
   613         // See RequestSubscriber::trySend below.
       
   614         final SequentialScheduler sendScheduler;
   579 
   615 
   580         RequestSubscriber(long contentLen) {
   616         RequestSubscriber(long contentLen) {
       
   617             this.contentLength = contentLen;
   581             this.remainingContentLength = contentLen;
   618             this.remainingContentLength = contentLen;
       
   619             this.sendScheduler = new SequentialScheduler(
       
   620                     new SynchronizedRestartableTask(this::trySend));
   582         }
   621         }
   583 
   622 
   584         @Override
   623         @Override
   585         public void onSubscribe(Flow.Subscription subscription) {
   624         public void onSubscribe(Flow.Subscription subscription) {
   586             if (this.subscription != null) {
   625             if (this.subscription != null) {
   587                 throw new IllegalStateException();
   626                 throw new IllegalStateException("already subscribed");
   588             }
   627             }
   589             this.subscription = subscription;
   628             this.subscription = subscription;
       
   629             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
   590             subscription.request(1);
   630             subscription.request(1);
   591         }
   631         }
   592 
   632 
   593         @Override
   633         @Override
   594         public void onNext(ByteBuffer item) {
   634         public void onNext(ByteBuffer item) {
       
   635             debug.log(Level.DEBUG,
       
   636                     "RequestSubscriber: onNext(%d)", item.remaining());
       
   637             // Got some more request body bytes to send.
   595             if (requestBodyCF.isDone()) {
   638             if (requestBodyCF.isDone()) {
   596                 throw new IllegalStateException();
   639                 // stream already cancelled, probably in timeout
   597             }
   640                 sendScheduler.stop();
   598 
   641                 subscription.cancel();
       
   642                 return;
       
   643             }
       
   644             ByteBuffer prev = current;
       
   645             assert prev == null;
       
   646             current = item;
       
   647             sendScheduler.runOrSchedule();
       
   648         }
       
   649 
       
   650         @Override
       
   651         public void onError(Throwable throwable) {
       
   652             debug.log(Level.DEBUG,
       
   653                       () -> "RequestSubscriber: onError: " + throwable);
       
   654             // ensure that errors are handled within the flow.
       
   655             if (errorRef.compareAndSet(null, throwable)) {
       
   656                 sendScheduler.runOrSchedule();
       
   657             }
       
   658         }
       
   659 
       
   660         @Override
       
   661         public void onComplete() {
       
   662             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
       
   663             // last byte of request body has been obtained.
       
   664             // ensure that everything is completed within the flow.
       
   665             onNext(COMPLETED);
       
   666         }
       
   667 
       
   668         // Attempts to send the data, if any.
       
   669         // Handles errors and completion state.
       
   670         // Pause writing if the send window is exhausted, resume it if the
       
   671         // send window has some bytes that can be acquired.
       
   672         void trySend() {
   599             try {
   673             try {
       
   674                 // handle errors raised by onError;
       
   675                 Throwable t = errorRef.get();
       
   676                 if (t != null) {
       
   677                     sendScheduler.stop();
       
   678                     if (requestBodyCF.isDone()) return;
       
   679                     subscription.cancel();
       
   680                     requestBodyCF.completeExceptionally(t);
       
   681                     return;
       
   682                 }
       
   683 
       
   684                 // handle COMPLETED;
       
   685                 ByteBuffer item = current;
       
   686                 if (item == null) return;
       
   687                 else if (item == COMPLETED) {
       
   688                     sendScheduler.stop();
       
   689                     complete();
       
   690                     return;
       
   691                 }
       
   692 
       
   693                 // handle bytes to send downstream
   600                 while (item.hasRemaining()) {
   694                 while (item.hasRemaining()) {
       
   695                     debug.log(Level.DEBUG, "trySend: %d", item.remaining());
   601                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
   696                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
   602                     DataFrame df = getDataFrame(item);
   697                     DataFrame df = getDataFrame(item);
   603                     if (remainingContentLength > 0) {
   698                     if (df == null) {
       
   699                         debug.log(Level.DEBUG, "trySend: can't send yet: %d",
       
   700                                   item.remaining());
       
   701                         return; // the send window is exhausted: come back later
       
   702                     }
       
   703 
       
   704                     if (contentLength > 0) {
   604                         remainingContentLength -= df.getDataLength();
   705                         remainingContentLength -= df.getDataLength();
   605                         assert remainingContentLength >= 0;
   706                         if (remainingContentLength < 0) {
   606                         if (remainingContentLength == 0) {
   707                             String msg = connection().getConnectionFlow()
       
   708                                          + " stream=" + streamid + " "
       
   709                                          + "[" + Thread.currentThread().getName() +"] "
       
   710                                          + "Too many bytes in request body. Expected: "
       
   711                                          + contentLength + ", got: "
       
   712                                          + (contentLength - remainingContentLength);
       
   713                             connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
       
   714                             throw new IOException(msg);
       
   715                         } else if (remainingContentLength == 0) {
   607                             df.setFlag(DataFrame.END_STREAM);
   716                             df.setFlag(DataFrame.END_STREAM);
   608                             endStreamSent = true;
   717                             endStreamSent = true;
   609                         }
   718                         }
   610                     }
   719                     }
       
   720                     debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
   611                     connection.sendDataFrame(df);
   721                     connection.sendDataFrame(df);
   612                 }
   722                 }
       
   723                 assert !item.hasRemaining();
       
   724                 current = null;
       
   725                 debug.log(Level.DEBUG, "trySend: request 1");
   613                 subscription.request(1);
   726                 subscription.request(1);
   614             } catch (InterruptedException ex) {
   727             } catch (Throwable ex) {
       
   728                 debug.log(Level.DEBUG, "trySend: ", ex);
       
   729                 sendScheduler.stop();
   615                 subscription.cancel();
   730                 subscription.cancel();
   616                 requestBodyCF.completeExceptionally(ex);
   731                 requestBodyCF.completeExceptionally(ex);
   617             }
   732             }
   618         }
   733         }
   619 
   734 
   620         @Override
   735         private void complete() throws IOException {
   621         public void onError(Throwable throwable) {
   736             long remaining = remainingContentLength;
   622             if (requestBodyCF.isDone()) {
   737             long written = contentLength - remaining;
   623                 return;
   738             if (remaining > 0) {
   624             }
   739                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
   625             subscription.cancel();
   740                 // let trySend() handle the exception
   626             requestBodyCF.completeExceptionally(throwable);
   741                 throw new IOException(connection().getConnectionFlow()
   627         }
   742                                      + " stream=" + streamid + " "
   628 
   743                                      + "[" + Thread.currentThread().getName() +"] "
   629         @Override
   744                                      + "Too few bytes returned by the publisher ("
   630         public void onComplete() {
   745                                               + written + "/"
   631             assert endStreamSent || remainingContentLength < 0;
   746                                               + contentLength + ")");
   632             try {
   747             }
   633                 if (!endStreamSent) {
   748             if (!endStreamSent) {
   634                     endStreamSent = true;
   749                 endStreamSent = true;
   635                     connection.sendDataFrame(getEmptyEndStreamDataFrame());
   750                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
   636                 }
   751             }
   637                 requestBodyCF.complete(null);
   752             requestBodyCF.complete(null);
   638             } catch (InterruptedException ex) {
   753         }
   639                 requestBodyCF.completeExceptionally(ex);
   754     }
   640             }
   755 
   641         }
   756     DataFrame getDataFrame(ByteBuffer buffer) {
   642     }
       
   643 
       
   644     DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
       
   645         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
   757         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
   646         // blocks waiting for stream send window, if exhausted
   758         // blocks waiting for stream send window, if exhausted
   647         int actualAmount = windowController.tryAcquire(requestAmount, streamid);
   759         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
       
   760         if (actualAmount <= 0) return null;
   648         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
   761         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
   649         DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
   762         DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
   650         return df;
   763         return df;
   651     }
   764     }
   652 
   765 
   653     private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
   766     private DataFrame getEmptyEndStreamDataFrame()  {
   654         return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
   767         return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
   655     }
   768     }
   656 
   769 
   657     /**
   770     /**
   658      * A List of responses relating to this stream. Normally there is only
   771      * A List of responses relating to this stream. Normally there is only
   664 
   777 
   665     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
   778     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
   666 
   779 
   667     @Override
   780     @Override
   668     CompletableFuture<Response> getResponseAsync(Executor executor) {
   781     CompletableFuture<Response> getResponseAsync(Executor executor) {
   669         CompletableFuture<Response> cf = null;
   782         CompletableFuture<Response> cf;
   670         // The code below deals with race condition that can be caused when
   783         // The code below deals with race condition that can be caused when
   671         // completeResponse() is being called before getResponseAsync()
   784         // completeResponse() is being called before getResponseAsync()
   672         synchronized (response_cfs) {
   785         synchronized (response_cfs) {
   673             if (!response_cfs.isEmpty()) {
   786             if (!response_cfs.isEmpty()) {
   674                 // This CompletableFuture was created by completeResponse().
   787                 // This CompletableFuture was created by completeResponse().
   691         }
   804         }
   692         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
   805         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
   693         PushGroup<?,?> pg = exchange.getPushGroup();
   806         PushGroup<?,?> pg = exchange.getPushGroup();
   694         if (pg != null) {
   807         if (pg != null) {
   695             // if an error occurs make sure it is recorded in the PushGroup
   808             // if an error occurs make sure it is recorded in the PushGroup
   696             cf = cf.whenComplete((t,e) -> pg.pushError(e));
   809             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
   697         }
   810         }
   698         return cf;
   811         return cf;
   699     }
   812     }
   700 
   813 
   701     /**
   814     /**
   761             response_cfs.add(MinimalFuture.failedFuture(t));
   874             response_cfs.add(MinimalFuture.failedFuture(t));
   762         }
   875         }
   763     }
   876     }
   764 
   877 
   765     CompletableFuture<Void> sendBodyImpl() {
   878     CompletableFuture<Void> sendBodyImpl() {
   766         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
   879         requestBodyCF.whenComplete((v, t) -> requestSent());
   767         requestProcessor.subscribe(subscriber);
   880         if (requestPublisher != null) {
   768         requestBodyCF.whenComplete((v,t) -> requestSent());
   881             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
       
   882             requestPublisher.subscribe(requestSubscriber = subscriber);
       
   883         } else {
       
   884             // there is no request body, therefore the request is complete,
       
   885             // END_STREAM has already sent with outgoing headers
       
   886             requestBodyCF.complete(null);
       
   887         }
   769         return requestBodyCF;
   888         return requestBodyCF;
   770     }
   889     }
   771 
   890 
   772     @Override
   891     @Override
   773     void cancel() {
   892     void cancel() {
   785             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
   904             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
   786         }
   905         }
   787         boolean closing;
   906         boolean closing;
   788         if (closing = !closed) { // assigning closing to !closed
   907         if (closing = !closed) { // assigning closing to !closed
   789             synchronized (this) {
   908             synchronized (this) {
       
   909                 failed = e;
   790                 if (closing = !closed) { // assigning closing to !closed
   910                 if (closing = !closed) { // assigning closing to !closed
   791                     closed=true;
   911                     closed=true;
   792                 }
   912                 }
   793             }
   913             }
   794         }
   914         }
   795         if (closing) { // true if the stream has not been closed yet
   915         if (closing) { // true if the stream has not been closed yet
   796             inputQ.close();
   916             if (responseSubscriber != null)
       
   917                 sched.runOrSchedule();
   797         }
   918         }
   798         completeResponseExceptionally(e);
   919         completeResponseExceptionally(e);
       
   920         if (!requestBodyCF.isDone()) {
       
   921             requestBodyCF.completeExceptionally(e); // we may be sending the body..
       
   922         }
       
   923         if (responseBodyCF != null) {
       
   924             responseBodyCF.completeExceptionally(e);
       
   925         }
   799         try {
   926         try {
   800             // will send a RST_STREAM frame
   927             // will send a RST_STREAM frame
   801             if (streamid != 0) {
   928             if (streamid != 0) {
   802                 connection.resetStream(streamid, ResetFrame.CANCEL);
   929                 connection.resetStream(streamid, ResetFrame.CANCEL);
   803             }
   930             }
   812         synchronized(this) {
   939         synchronized(this) {
   813             if (closed) return;
   940             if (closed) return;
   814             closed = true;
   941             closed = true;
   815         }
   942         }
   816         Log.logTrace("Closing stream {0}", streamid);
   943         Log.logTrace("Closing stream {0}", streamid);
   817         inputQ.close();
       
   818         connection.closeStream(streamid);
   944         connection.closeStream(streamid);
   819         Log.logTrace("Stream {0} closed", streamid);
   945         Log.logTrace("Stream {0} closed", streamid);
   820     }
   946     }
   821 
   947 
   822     static class PushedStream<U,T> extends Stream<T> {
   948     static class PushedStream<U,T> extends Stream<T> {
   858         // error record it in the PushGroup. The error method is called
   984         // error record it in the PushGroup. The error method is called
   859         // with a null value when no error occurred (is a no-op)
   985         // with a null value when no error occurred (is a no-op)
   860         @Override
   986         @Override
   861         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
   987         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
   862             return super.sendBodyAsync()
   988             return super.sendBodyAsync()
   863                         .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
   989                         .whenComplete((ExchangeImpl<T> v, Throwable t)
       
   990                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
   864         }
   991         }
   865 
   992 
   866         @Override
   993         @Override
   867         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
   994         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
   868             return super.sendHeadersAsync()
   995             return super.sendHeadersAsync()
   869                         .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
   996                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
       
   997                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
   870         }
   998         }
   871 
   999 
   872         @Override
  1000         @Override
   873         CompletableFuture<Response> getResponseAsync(Executor executor) {
  1001         CompletableFuture<Response> getResponseAsync(Executor executor) {
   874             CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
  1002             CompletableFuture<Response> cf = pushCF.whenComplete(
       
  1003                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
   875             if(executor!=null && !cf.isDone()) {
  1004             if(executor!=null && !cf.isDone()) {
   876                 cf  = cf.thenApplyAsync( r -> r, executor);
  1005                 cf  = cf.thenApplyAsync( r -> r, executor);
   877             }
  1006             }
   878             return cf;
  1007             return cf;
   879         }
  1008         }
   888                         .whenComplete((v, t) -> pushGroup.pushError(t));
  1017                         .whenComplete((v, t) -> pushGroup.pushError(t));
   889         }
  1018         }
   890 
  1019 
   891         @Override
  1020         @Override
   892         void completeResponse(Response r) {
  1021         void completeResponse(Response r) {
   893             HttpResponseImpl.logResponse(r);
  1022             Log.logResponse(r::toString);
   894             pushCF.complete(r); // not strictly required for push API
  1023             pushCF.complete(r); // not strictly required for push API
   895             // start reading the body using the obtained BodyProcessor
  1024             // start reading the body using the obtained BodyProcessor
   896             CompletableFuture<Void> start = new MinimalFuture<>();
  1025             CompletableFuture<Void> start = new MinimalFuture<>();
   897             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
  1026             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
   898                 .whenComplete((T body, Throwable t) -> {
  1027                 .whenComplete((T body, Throwable t) -> {
   899                     if (t != null) {
  1028                     if (t != null) {
   900                         responseCF.completeExceptionally(t);
  1029                         responseCF.completeExceptionally(t);
   901                     } else {
  1030                     } else {
   902                         HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
  1031                         HttpResponseImpl<T> resp =
   903                         responseCF.complete(response);
  1032                                 new HttpResponseImpl<>(r.request, r, body, getExchange());
       
  1033                         responseCF.complete(resp);
   904                     }
  1034                     }
   905                 });
  1035                 });
   906             start.completeAsync(() -> null, getExchange().executor());
  1036             start.completeAsync(() -> null, getExchange().executor());
   907         }
  1037         }
   908 
  1038 
   958         int getStreamId() {
  1088         int getStreamId() {
   959             return streamid;
  1089             return streamid;
   960         }
  1090         }
   961     }
  1091     }
   962 
  1092 
       
  1093     /**
       
  1094      * Returns true if this exchange was canceled.
       
  1095      * @return true if this exchange was canceled.
       
  1096      */
       
  1097     synchronized boolean isCanceled() {
       
  1098         return failed != null;
       
  1099     }
       
  1100 
       
  1101     /**
       
  1102      * Returns the cause for which this exchange was canceled, if available.
       
  1103      * @return the cause for which this exchange was canceled, if available.
       
  1104      */
       
  1105     synchronized Throwable getCancelCause() {
       
  1106         return failed;
       
  1107     }
       
  1108 
       
  1109     final String dbgString() {
       
  1110         return connection.dbgString() + "/Stream("+streamid+")";
       
  1111     }
   963 }
  1112 }