src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PushGroup.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56062 48afabd2de78
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
       
     1 /*
       
     2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.security.AccessControlContext;
       
    29 import java.security.AccessController;
       
    30 import java.util.Objects;
       
    31 import java.util.concurrent.CompletableFuture;
       
    32 import jdk.incubator.http.HttpRequest;
       
    33 import jdk.incubator.http.HttpResponse;
       
    34 import jdk.incubator.http.HttpResponse.BodyHandler;
       
    35 import jdk.incubator.http.HttpResponse.PushPromiseHandler;
       
    36 import jdk.incubator.http.internal.common.MinimalFuture;
       
    37 import jdk.incubator.http.internal.common.Log;
       
    38 
       
    39 /**
       
    40  * One PushGroup object is associated with the parent Stream of the pushed
       
    41  * Streams. This keeps track of all common state associated with the pushes.
       
    42  */
       
    43 class PushGroup<T> {
       
    44     private final HttpRequest initiatingRequest;
       
    45 
       
    46     final CompletableFuture<Void> noMorePushesCF;
       
    47 
       
    48     volatile Throwable error; // any exception that occurred during pushes
       
    49 
       
    50     // user's subscriber object
       
    51     final PushPromiseHandler<T> pushPromiseHandler;
       
    52 
       
    53     private final AccessControlContext acc;
       
    54 
       
    55     int numberOfPushes;
       
    56     int remainingPushes;
       
    57     boolean noMorePushes = false;
       
    58 
       
    59     PushGroup(PushPromiseHandler<T> pushPromiseHandler,
       
    60               HttpRequestImpl initiatingRequest,
       
    61               AccessControlContext acc) {
       
    62         this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), acc);
       
    63     }
       
    64 
       
    65     // Check mainBodyHandler before calling nested constructor.
       
    66     private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,
       
    67                       HttpRequestImpl initiatingRequest,
       
    68                       CompletableFuture<HttpResponse<T>> mainResponse,
       
    69                       AccessControlContext acc) {
       
    70         this.noMorePushesCF = new MinimalFuture<>();
       
    71         this.pushPromiseHandler = pushPromiseHandler;
       
    72         this.initiatingRequest = initiatingRequest;
       
    73         // Restricts the file publisher with the senders ACC, if any
       
    74         if (pushPromiseHandler instanceof UntrustedBodyHandler)
       
    75             ((UntrustedBodyHandler)this.pushPromiseHandler).setAccessControlContext(acc);
       
    76         this.acc = acc;
       
    77     }
       
    78 
       
    79     interface Acceptor<T> {
       
    80         BodyHandler<T> bodyHandler();
       
    81         CompletableFuture<HttpResponse<T>> cf();
       
    82         boolean accepted();
       
    83     }
       
    84 
       
    85     private static class AcceptorImpl<T> implements Acceptor<T> {
       
    86         private volatile HttpResponse.BodyHandler<T> bodyHandler;
       
    87         private volatile CompletableFuture<HttpResponse<T>> cf;
       
    88 
       
    89         CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) {
       
    90             Objects.requireNonNull(bodyHandler);
       
    91             if (this.bodyHandler != null)
       
    92                 throw new IllegalStateException("non-null bodyHandler");
       
    93             this.bodyHandler = bodyHandler;
       
    94             cf = new MinimalFuture<>();
       
    95             return cf;
       
    96         }
       
    97 
       
    98         @Override public BodyHandler<T> bodyHandler() { return bodyHandler; }
       
    99 
       
   100         @Override public CompletableFuture<HttpResponse<T>> cf() { return cf; }
       
   101 
       
   102         @Override public boolean accepted() { return cf != null; }
       
   103     }
       
   104 
       
   105     Acceptor<T> acceptPushRequest(HttpRequest pushRequest) {
       
   106         AcceptorImpl<T> acceptor = new AcceptorImpl<>();
       
   107 
       
   108         pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);
       
   109 
       
   110         synchronized (this) {
       
   111             if (acceptor.accepted()) {
       
   112                 if (acceptor.bodyHandler instanceof UntrustedBodyHandler) {
       
   113                     ((UntrustedBodyHandler) acceptor.bodyHandler).setAccessControlContext(acc);
       
   114                 }
       
   115                 numberOfPushes++;
       
   116                 remainingPushes++;
       
   117             }
       
   118             return acceptor;
       
   119         }
       
   120     }
       
   121 
       
   122     // This is called when the main body response completes because it means
       
   123     // no more PUSH_PROMISEs are possible
       
   124 
       
   125     synchronized void noMorePushes(boolean noMore) {
       
   126         noMorePushes = noMore;
       
   127         checkIfCompleted();
       
   128         noMorePushesCF.complete(null);
       
   129     }
       
   130 
       
   131     synchronized CompletableFuture<Void> pushesCF() {
       
   132         return noMorePushesCF;
       
   133     }
       
   134 
       
   135     synchronized boolean noMorePushes() {
       
   136         return noMorePushes;
       
   137     }
       
   138 
       
   139     synchronized void pushCompleted() {
       
   140         remainingPushes--;
       
   141         checkIfCompleted();
       
   142     }
       
   143 
       
   144     synchronized void checkIfCompleted() {
       
   145         if (Log.trace()) {
       
   146             Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}",
       
   147                          remainingPushes,
       
   148                          (error==null)?error:error.getClass().getSimpleName(),
       
   149                          noMorePushes);
       
   150         }
       
   151         if (remainingPushes == 0 && error == null && noMorePushes) {
       
   152             if (Log.trace()) {
       
   153                 Log.logTrace("push completed");
       
   154             }
       
   155         }
       
   156     }
       
   157 
       
   158     synchronized void pushError(Throwable t) {
       
   159         if (t == null) {
       
   160             return;
       
   161         }
       
   162         this.error = t;
       
   163     }
       
   164 }