src/java.net.http/share/classes/jdk/internal/net/http/PushGroup.java
branchhttp-client-branch
changeset 56282 10cebcd18d47
parent 56268 481d8c9acc7f
child 56451 9585061fdb04
equal deleted inserted replaced
56281:7fdd89dabab2 56282:10cebcd18d47
    49     volatile Throwable error; // any exception that occurred during pushes
    49     volatile Throwable error; // any exception that occurred during pushes
    50 
    50 
    51     // user's subscriber object
    51     // user's subscriber object
    52     final PushPromiseHandler<T> pushPromiseHandler;
    52     final PushPromiseHandler<T> pushPromiseHandler;
    53 
    53 
    54     private final AccessControlContext acc;
    54     private final Executor executor;
    55 
    55 
    56     int numberOfPushes;
    56     int numberOfPushes;
    57     int remainingPushes;
    57     int remainingPushes;
    58     boolean noMorePushes = false;
    58     boolean noMorePushes = false;
    59 
    59 
    60     PushGroup(PushPromiseHandler<T> pushPromiseHandler,
    60     PushGroup(PushPromiseHandler<T> pushPromiseHandler,
    61               HttpRequestImpl initiatingRequest,
    61               HttpRequestImpl initiatingRequest,
    62               AccessControlContext acc) {
    62               Executor executor) {
    63         this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), acc);
    63         this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), executor);
    64     }
    64     }
    65 
    65 
    66     // Check mainBodyHandler before calling nested constructor.
    66     // Check mainBodyHandler before calling nested constructor.
    67     private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,
    67     private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,
    68                       HttpRequestImpl initiatingRequest,
    68                       HttpRequestImpl initiatingRequest,
    69                       CompletableFuture<HttpResponse<T>> mainResponse,
    69                       CompletableFuture<HttpResponse<T>> mainResponse,
    70                       AccessControlContext acc) {
    70                       Executor executor) {
    71         this.noMorePushesCF = new MinimalFuture<>();
    71         this.noMorePushesCF = new MinimalFuture<>();
    72         this.pushPromiseHandler = pushPromiseHandler;
    72         this.pushPromiseHandler = pushPromiseHandler;
    73         this.initiatingRequest = initiatingRequest;
    73         this.initiatingRequest = initiatingRequest;
    74         this.acc = acc;
    74         this.executor = executor;
    75     }
    75     }
    76 
    76 
    77     interface Acceptor<T> {
    77     interface Acceptor<T> {
    78         BodyHandler<T> bodyHandler();
    78         BodyHandler<T> bodyHandler();
    79         CompletableFuture<HttpResponse<T>> cf();
    79         CompletableFuture<HttpResponse<T>> cf();
    80         boolean accepted();
    80         boolean accepted();
    81     }
    81     }
    82 
    82 
    83     private static class AcceptorImpl<T> implements Acceptor<T> {
    83     private static class AcceptorImpl<T> implements Acceptor<T> {
       
    84         private final Executor executor;
    84         private volatile HttpResponse.BodyHandler<T> bodyHandler;
    85         private volatile HttpResponse.BodyHandler<T> bodyHandler;
    85         private volatile CompletableFuture<HttpResponse<T>> cf;
    86         private volatile CompletableFuture<HttpResponse<T>> cf;
       
    87 
       
    88         AcceptorImpl(Executor executor) {
       
    89             this.executor = executor;
       
    90         }
    86 
    91 
    87         CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) {
    92         CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) {
    88             Objects.requireNonNull(bodyHandler);
    93             Objects.requireNonNull(bodyHandler);
    89             if (this.bodyHandler != null)
    94             if (this.bodyHandler != null)
    90                 throw new IllegalStateException("non-null bodyHandler");
    95                 throw new IllegalStateException("non-null bodyHandler");
    91             this.bodyHandler = bodyHandler;
    96             this.bodyHandler = bodyHandler;
    92             cf = new MinimalFuture<>();
    97             cf = new MinimalFuture<>();
    93             return cf;
    98             return cf.whenCompleteAsync((r,t) -> {}, executor);
    94         }
    99         }
    95 
   100 
    96         @Override public BodyHandler<T> bodyHandler() { return bodyHandler; }
   101         @Override public BodyHandler<T> bodyHandler() { return bodyHandler; }
    97 
   102 
    98         @Override public CompletableFuture<HttpResponse<T>> cf() { return cf; }
   103         @Override public CompletableFuture<HttpResponse<T>> cf() { return cf; }
    99 
   104 
   100         @Override public boolean accepted() { return cf != null; }
   105         @Override public boolean accepted() { return cf != null; }
   101     }
   106     }
   102 
   107 
   103     Acceptor<T> acceptPushRequest(HttpRequest pushRequest, Executor e) {
   108     Acceptor<T> acceptPushRequest(HttpRequest pushRequest) {
   104         AcceptorImpl<T> acceptor = new AcceptorImpl<>();
   109         AcceptorImpl<T> acceptor = new AcceptorImpl<>(executor);
   105         try {
   110         try {
   106             pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);
   111             pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);
   107         } catch (Throwable t) {
   112         } catch (Throwable t) {
   108             if (acceptor.accepted()) {
   113             if (acceptor.accepted()) {
   109                 CompletableFuture<?> cf = acceptor.cf();
   114                 CompletableFuture<?> cf = acceptor.cf();
   110                 e.execute(() -> cf.completeExceptionally(t));
   115                 cf.completeExceptionally(t);
   111             }
   116             }
   112             throw t;
   117             throw t;
   113         }
   118         }
   114 
   119 
   115         synchronized (this) {
   120         synchronized (this) {