471 connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); |
471 connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); |
472 pushStream.close(); |
472 pushStream.close(); |
473 return; |
473 return; |
474 } |
474 } |
475 |
475 |
476 PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest); |
476 PushGroup.Acceptor<T> acceptor = null; |
477 |
477 boolean accepted = false; |
478 if (!acceptor.accepted()) { |
478 try { |
|
479 acceptor = pushGroup.acceptPushRequest(pushRequest, |
|
480 connection.client().theExecutor()); |
|
481 accepted = acceptor.accepted(); |
|
482 } catch (Throwable t) { |
|
483 debug.log(Level.DEBUG, |
|
484 "PushPromiseHandler::applyPushPromise threw exception %s", |
|
485 (Object)t); |
|
486 } |
|
487 if (!accepted) { |
479 // cancel / reject |
488 // cancel / reject |
480 IOException ex = new IOException("Stream " + streamid + " cancelled by users handler"); |
489 IOException ex = new IOException("Stream " + streamid + " cancelled by users handler"); |
481 if (Log.trace()) { |
490 if (Log.trace()) { |
482 Log.logTrace("No body subscriber for {0}: {1}", pushRequest, |
491 Log.logTrace("No body subscriber for {0}: {1}", pushRequest, |
483 ex.getMessage()); |
492 ex.getMessage()); |
484 } |
493 } |
485 pushStream.cancelImpl(ex); |
494 pushStream.cancelImpl(ex); |
486 return; |
495 return; |
487 } |
496 } |
488 |
497 |
|
498 assert accepted && acceptor != null; |
489 CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf(); |
499 CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf(); |
490 HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler(); |
500 HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler(); |
491 assert pushHandler != null; |
501 assert pushHandler != null; |
492 |
502 |
493 pushStream.requestSent(); |
503 pushStream.requestSent(); |
494 pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ? |
504 pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ? |
495 // setup housekeeping for when the push is received |
505 // setup housekeeping for when the push is received |
496 // TODO: deal with ignoring of CF anti-pattern |
506 // TODO: deal with ignoring of CF anti-pattern |
497 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); |
507 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); |
498 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { |
508 cf.whenCompleteAsync((HttpResponse<T> resp, Throwable t) -> { |
499 t = Utils.getCompletionCause(t); |
509 t = Utils.getCompletionCause(t); |
500 if (Log.trace()) { |
510 if (Log.trace()) { |
501 Log.logTrace("Push completed on stream {0} for {1}{2}", |
511 Log.logTrace("Push completed on stream {0} for {1}{2}", |
502 pushStream.streamid, resp, |
512 pushStream.streamid, resp, |
503 ((t==null) ? "": " with exception " + t)); |
513 ((t==null) ? "": " with exception " + t)); |
507 pushResponseCF.completeExceptionally(t); |
517 pushResponseCF.completeExceptionally(t); |
508 } else { |
518 } else { |
509 pushResponseCF.complete(resp); |
519 pushResponseCF.complete(resp); |
510 } |
520 } |
511 pushGroup.pushCompleted(); |
521 pushGroup.pushCompleted(); |
512 }); |
522 }, connection.client().theExecutor()); |
513 |
523 |
514 } |
524 } |
515 |
525 |
516 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { |
526 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { |
517 HttpHeadersImpl h = request.getSystemHeaders(); |
527 HttpHeadersImpl h = request.getSystemHeaders(); |