src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
branchhttp-client-branch
changeset 56268 481d8c9acc7f
parent 56261 a339fe1aab55
child 56282 10cebcd18d47
equal deleted inserted replaced
56267:fe6f17faa23a 56268:481d8c9acc7f
   471             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
   471             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
   472             pushStream.close();
   472             pushStream.close();
   473             return;
   473             return;
   474         }
   474         }
   475 
   475 
   476         PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
   476         PushGroup.Acceptor<T> acceptor = null;
   477 
   477         boolean accepted = false;
   478         if (!acceptor.accepted()) {
   478         try {
       
   479             acceptor = pushGroup.acceptPushRequest(pushRequest,
       
   480                     connection.client().theExecutor());
       
   481             accepted = acceptor.accepted();
       
   482         } catch (Throwable t) {
       
   483             debug.log(Level.DEBUG,
       
   484                     "PushPromiseHandler::applyPushPromise threw exception %s",
       
   485                     (Object)t);
       
   486         }
       
   487         if (!accepted) {
   479             // cancel / reject
   488             // cancel / reject
   480             IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
   489             IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
   481             if (Log.trace()) {
   490             if (Log.trace()) {
   482                 Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
   491                 Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
   483                         ex.getMessage());
   492                         ex.getMessage());
   484             }
   493             }
   485             pushStream.cancelImpl(ex);
   494             pushStream.cancelImpl(ex);
   486             return;
   495             return;
   487         }
   496         }
   488 
   497 
       
   498         assert accepted && acceptor != null;
   489         CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
   499         CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
   490         HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
   500         HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
   491         assert pushHandler != null;
   501         assert pushHandler != null;
   492 
   502 
   493         pushStream.requestSent();
   503         pushStream.requestSent();
   494         pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
   504         pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
   495         // setup housekeeping for when the push is received
   505         // setup housekeeping for when the push is received
   496         // TODO: deal with ignoring of CF anti-pattern
   506         // TODO: deal with ignoring of CF anti-pattern
   497         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
   507         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
   498         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
   508         cf.whenCompleteAsync((HttpResponse<T> resp, Throwable t) -> {
   499             t = Utils.getCompletionCause(t);
   509             t = Utils.getCompletionCause(t);
   500             if (Log.trace()) {
   510             if (Log.trace()) {
   501                 Log.logTrace("Push completed on stream {0} for {1}{2}",
   511                 Log.logTrace("Push completed on stream {0} for {1}{2}",
   502                              pushStream.streamid, resp,
   512                              pushStream.streamid, resp,
   503                              ((t==null) ? "": " with exception " + t));
   513                              ((t==null) ? "": " with exception " + t));
   507                 pushResponseCF.completeExceptionally(t);
   517                 pushResponseCF.completeExceptionally(t);
   508             } else {
   518             } else {
   509                 pushResponseCF.complete(resp);
   519                 pushResponseCF.complete(resp);
   510             }
   520             }
   511             pushGroup.pushCompleted();
   521             pushGroup.pushCompleted();
   512         });
   522         }, connection.client().theExecutor());
   513 
   523 
   514     }
   524     }
   515 
   525 
   516     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
   526     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
   517         HttpHeadersImpl h = request.getSystemHeaders();
   527         HttpHeadersImpl h = request.getSystemHeaders();