src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
branchhttp-client-branch
changeset 56010 782b2f2d1e76
parent 55983 e4a1f0c9d4c6
child 56019 2cb33775fc6f
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Jan 16 15:52:01 2018 +0000
@@ -224,7 +224,7 @@
         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
         CompletableFuture<T> cf = receiveData(bodySubscriber);
 
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(e));
@@ -420,42 +420,46 @@
         }
     }
 
-    void incoming_pushPromise(HttpRequestImpl pushReq,
-                              PushedStream<?,T> pushStream)
+    void incoming_pushPromise(HttpRequestImpl pushRequest,
+                              PushedStream<T> pushStream)
         throws IOException
     {
         if (Log.requests()) {
-            Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
         }
-        PushGroup<?,T> pushGroup = exchange.getPushGroup();
-        if (pushGroup == null || pushGroup.noMorePushes()) {
+        PushGroup<T> pushGroup = exchange.getPushGroup();
+        if (pushGroup == null) {
+            // no push handler set by the user code, i.e. cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
+            pushStream.cancelImpl(ex);
+            return;
+        }
+
+        if (pushGroup.noMorePushes()) {
             cancelImpl(new IllegalStateException("unexpected push promise"
                 + " on stream " + streamid));
             return;
         }
 
-        HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
-
-        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-
-        Optional<HttpResponse.BodyHandler<T>> bpOpt =
-                pushGroup.handlerForPushRequest(pushReq);
+        PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
+        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
 
-        if (!bpOpt.isPresent()) {
-            IOException ex = new IOException("Stream "
-                 + streamid + " cancelled by user");
+        if (!acceptor.accepted()) {
+            // cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
             if (Log.trace()) {
-                Log.logTrace("No body subscriber for {0}: {1}", pushReq,
-                            ex.getMessage());
+                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
+                        ex.getMessage());
             }
             pushStream.cancelImpl(ex);
-            cf.completeExceptionally(ex);
             return;
         }
 
-        pushGroup.addPush();
+        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
+        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+
         pushStream.requestSent();
-        pushStream.setPushHandler(bpOpt.get());
+        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
         // setup housekeeping for when the push is received
         // TODO: deal with ignoring of CF anti-pattern
         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
@@ -467,9 +471,9 @@
             }
             if (t != null) {
                 pushGroup.pushError(t);
-                proc.onError(pushReq, t);
+                pushResponseCF.completeExceptionally(t);
             } else {
-                proc.onResponse(resp);
+                pushResponseCF.complete(resp);
             }
             pushGroup.pushCompleted();
         });
@@ -811,7 +815,7 @@
             cf = cf.thenApplyAsync(r -> r, executor);
         }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
@@ -950,16 +954,16 @@
         Log.logTrace("Stream {0} closed", streamid);
     }
 
-    static class PushedStream<U,T> extends Stream<T> {
-        final PushGroup<U,T> pushGroup;
+    static class PushedStream<T> extends Stream<T> {
+        final PushGroup<T> pushGroup;
         // push streams need the response CF allocated up front as it is
         // given directly to user via the multi handler callback function.
         final CompletableFuture<Response> pushCF;
-        final CompletableFuture<HttpResponse<T>> responseCF;
+        CompletableFuture<HttpResponse<T>> responseCF;
         final HttpRequestImpl pushReq;
         HttpResponse.BodyHandler<T> pushHandler;
 
-        PushedStream(PushGroup<U,T> pushGroup,
+        PushedStream(PushGroup<T> pushGroup,
                      Http2Connection connection,
                      Exchange<T> pushReq) {
             // ## no request body possible, null window controller
@@ -968,6 +972,7 @@
             this.pushReq = pushReq.request();
             this.pushCF = new MinimalFuture<>();
             this.responseCF = new MinimalFuture<>();
+
         }
 
         CompletableFuture<HttpResponse<T>> responseCF() {