http-client-branch: updated server push API http-client-branch
authormichaelm
Tue, 16 Jan 2018 15:52:01 +0000
branchhttp-client-branch
changeset 56010 782b2f2d1e76
parent 56009 cf8792f51dee
child 56019 2cb33775fc6f
http-client-branch: updated server push API
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/CookieFilter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HeaderFilter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClient.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientFacade.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiMapResult.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PushGroup.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RedirectFilter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
test/jdk/java/net/httpclient/http2/ServerPush.java
test/jdk/java/net/httpclient/http2/ServerPushWithDiffTypes.java
test/jdk/java/net/httpclient/http2/server/PushHandler.java
test/jdk/java/net/httpclient/http2/server/TestUtil.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java	Tue Jan 16 15:52:01 2018 +0000
@@ -43,7 +43,7 @@
  * Implementation of Http Basic authentication.
  */
 class AuthenticationFilter implements HeaderFilter {
-    volatile MultiExchange<?,?> exchange;
+    volatile MultiExchange<?> exchange;
     private static final Base64.Encoder encoder = Base64.getEncoder();
 
     static final int DEFAULT_RETRY_LIMIT = 3;
@@ -108,7 +108,7 @@
     }
 
     @Override
-    public void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+    public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
         // use preemptive authentication if an entry exists.
         Cache cache = getCache(e);
         this.exchange = e;
@@ -263,7 +263,7 @@
     // be garbaged collected when no longer referenced.
     static final WeakHashMap<HttpClientImpl,Cache> caches = new WeakHashMap<>();
 
-    static synchronized Cache getCache(MultiExchange<?,?> exchange) {
+    static synchronized Cache getCache(MultiExchange<?> exchange) {
         HttpClientImpl client = exchange.client();
         Cache c = caches.get(client);
         if (c == null) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/CookieFilter.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/CookieFilter.java	Tue Jan 16 15:52:01 2018 +0000
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+    public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
         HttpClientImpl client = e.client();
         Optional<CookieHandler> cookieHandlerOpt = client.cookieHandler();
         if (cookieHandlerOpt.isPresent()) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Tue Jan 16 15:52:01 2018 +0000
@@ -69,13 +69,13 @@
     // has been established.
     private volatile IOException failed;
     final AccessControlContext acc;
-    final MultiExchange<?,T> multi;
+    final MultiExchange<T> multi;
     final Executor parentExecutor;
     boolean upgrading; // to HTTP/2
-    final PushGroup<?,T> pushGroup;
+    final PushGroup<T> pushGroup;
     final String dbgTag;
 
-    Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
+    Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
         this.request = request;
         this.upgrading = false;
         this.client = multi.client();
@@ -88,7 +88,7 @@
 
     /* If different AccessControlContext to be used  */
     Exchange(HttpRequestImpl request,
-             MultiExchange<?,T> multi,
+             MultiExchange<T> multi,
              AccessControlContext acc)
     {
         this.request = request;
@@ -101,7 +101,7 @@
         this.dbgTag = "Exchange";
     }
 
-    PushGroup<?,T> getPushGroup() {
+    PushGroup<T> getPushGroup() {
         return pushGroup;
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HeaderFilter.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HeaderFilter.java	Tue Jan 16 15:52:01 2018 +0000
@@ -34,7 +34,7 @@
  */
 interface HeaderFilter {
 
-    void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException;
+    void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException;
 
     /**
      * Returns null if response ok to be given to user.  Non null is a request
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Tue Jan 16 15:52:01 2018 +0000
@@ -642,7 +642,7 @@
         HttpHeadersImpl headers = decoder.headers();
         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
-        Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
+        Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
         pushExch.exchImpl = pushStream;
         pushStream.registerStream(promisedStreamid);
         parent.incoming_pushPromise(pushReq, pushStream);
@@ -839,8 +839,8 @@
         return stream;
     }
 
-    <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
-        PushGroup<?,T> pg = parent.exchange.getPushGroup();
+    <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
+        PushGroup<T> pg = parent.exchange.getPushGroup();
         return new Stream.PushedStream<>(pg, this, pushEx);
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClient.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClient.java	Tue Jan 16 15:52:01 2018 +0000
@@ -39,6 +39,8 @@
 import java.util.concurrent.ThreadFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 
 /**
  * A container for configuration information common to multiple {@link
@@ -447,31 +449,23 @@
      * @return a {@code CompletableFuture<HttpResponse<T>>}
      */
     public abstract <T> CompletableFuture<HttpResponse<T>>
-    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler);
+    sendAsync(HttpRequest req,
+              BodyHandler<T> responseBodyHandler);
 
     /**
-     * Sends the given request asynchronously using this client and the given
-     * multi response handler.
+     * Sends the given request asynchronously using this client with the given
+     * response body handler and push promise handler.
      *
-     * <p> The returned completable future completes exceptionally with:
-     * <ul>
-     * <li>{@link IOException} - if an I/O error occurs when sending or receiving</li>
-     * <li>{@link IllegalArgumentException} - if the request method is not supported</li>
-     * <li>{@link SecurityException} - If a security manager has been installed
-     *          and it denies {@link java.net.URLPermission access} to the
-     *          URL in the given request, or proxy if one is configured.
-     *          See HttpRequest for further information about
-     *          <a href="HttpRequest.html#securitychecks">security checks</a>.</li>
-     * </ul>
-     *
-     * @param <U> a type representing the aggregated results
-     * @param <T> a type representing all of the response bodies
+     * @param <T> the response body type
      * @param req the request
-     * @param multiSubscriber the multiSubscriber for the request
-     * @return a {@code CompletableFuture<U>}
+     * @param responseBodyHandler the response body handler
+     * @param pushPromiseHandler push promise handler, may be null
+     * @return a {@code CompletableFuture<HttpResponse<T>>}
      */
-    public abstract <U, T> CompletableFuture<U>
-    sendAsync(HttpRequest req, HttpResponse.MultiSubscriber<U, T> multiSubscriber);
+    public abstract <T> CompletableFuture<HttpResponse<T>>
+    sendAsync(HttpRequest req,
+              BodyHandler<T> responseBodyHandler,
+              PushPromiseHandler<T> pushPromiseHandler);
 
     /**
      * Creates a new {@code WebSocket} builder (optional operation).
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientFacade.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientFacade.java	Tue Jan 16 15:52:01 2018 +0000
@@ -36,6 +36,8 @@
 import java.util.concurrent.Executor;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 
 /**
  * An HttpClientFacade is a simple class that wraps an HttpClient implementation
@@ -115,10 +117,12 @@
     }
 
     @Override
-    public <U, T> CompletableFuture<U>
-    sendAsync(HttpRequest req, HttpResponse.MultiSubscriber<U, T> multiSubscriber) {
+    public <T> CompletableFuture<HttpResponse<T>>
+    sendAsync(HttpRequest req,
+              BodyHandler<T> responseBodyHandler,
+              PushPromiseHandler<T> pushPromiseHandler){
         try {
-            return impl.sendAsync(req, multiSubscriber);
+            return impl.sendAsync(req, responseBodyHandler, pushPromiseHandler);
         } finally {
             Reference.reachabilityFence(this);
         }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Tue Jan 16 15:52:01 2018 +0000
@@ -61,7 +61,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 import jdk.incubator.http.HttpResponse.BodyHandler;
-import jdk.incubator.http.HttpResponse.MultiSubscriber;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.Pair;
 import jdk.incubator.http.internal.common.Utils;
@@ -417,6 +417,16 @@
     public <T> CompletableFuture<HttpResponse<T>>
     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
     {
+        return sendAsync(userRequest, responseHandler, null);
+    }
+
+
+    @Override
+    public <T> CompletableFuture<HttpResponse<T>>
+    sendAsync(HttpRequest userRequest,
+              BodyHandler<T> responseHandler,
+              PushPromiseHandler<T> pushPromiseHandler)
+    {
         AccessControlContext acc = null;
         if (System.getSecurityManager() != null)
             acc = AccessController.getContext();
@@ -431,10 +441,11 @@
         try {
             debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
 
-            MultiExchange<Void,T> mex = new MultiExchange<>(userRequest,
+            MultiExchange<T> mex = new MultiExchange<>(userRequest,
                                                             requestImpl,
                                                             this,
                                                             responseHandler,
+                                                            pushPromiseHandler,
                                                             acc);
             CompletableFuture<HttpResponse<T>> res =
                     mex.responseAsync().whenComplete((b,t) -> unreference());
@@ -456,48 +467,6 @@
         }
     }
 
-    @Override
-    public <U, T> CompletableFuture<U>
-    sendAsync(HttpRequest userRequest, MultiSubscriber<U, T> responseHandler) {
-        AccessControlContext acc = null;
-        if (System.getSecurityManager() != null)
-            acc = AccessController.getContext();
-
-        // Clone the, possibly untrusted, HttpRequest
-        HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
-        if (requestImpl.method().equals("CONNECT"))
-            throw new IllegalArgumentException("Unsupported method CONNECT");
-
-        long start = DEBUGELAPSED ? System.nanoTime() : 0;
-        reference();
-        try {
-            debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest);
-
-            MultiExchange<U,T> mex = new MultiExchange<>(userRequest,
-                                                         requestImpl,
-                                                         this,
-                                                         responseHandler,
-                                                         acc);
-            CompletableFuture<U> res = mex.multiResponseAsync()
-                      .whenComplete((b,t) -> unreference());
-            if (DEBUGELAPSED) {
-                res = res.whenComplete(
-                        (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
-            }
-            // makes sure that any dependent actions happen in the executor
-            if (acc != null) {
-                res.whenCompleteAsync((r, t) -> { /* do nothing */},
-                                      new PrivilegedExecutor(executor, acc));
-            }
-
-            return res;
-        } catch(Throwable t) {
-            unreference();
-            debugCompleted("ClientImpl (async)", start, userRequest);
-            throw t;
-        }
-    }
-
     // Main loop for this client's selector
     private final static class SelectorManager extends Thread {
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Tue Jan 16 15:52:01 2018 +0000
@@ -25,11 +25,11 @@
 
 package jdk.incubator.http;
 
+import jdk.incubator.http.internal.common.MinimalFuture;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import jdk.incubator.http.ResponseSubscribers.MultiSubscriberImpl;
 import static jdk.incubator.http.internal.common.Utils.unchecked;
 import static jdk.incubator.http.internal.common.Utils.charsetFrom;
 import java.nio.ByteBuffer;
@@ -46,8 +46,10 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -314,9 +316,7 @@
      *
      * @param <T> the response body type
      */
-    @FunctionalInterface
     public interface BodyHandler<T> {
-
         /**
          * Returns a {@link BodySubscriber BodySubscriber} considering the given
          * response status code and headers. This method is always called before
@@ -331,6 +331,7 @@
          */
         public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);
 
+
         /**
          * Returns a response body handler that returns a {@link BodySubscriber
          * BodySubscriber}{@code <Void>} obtained from {@linkplain
@@ -724,6 +725,103 @@
     }
 
     /**
+     * A handler of <i>push promises</i> ...
+     */
+    public interface PushPromiseHandler<T> {
+        /**
+         * Notifies of an incoming Push Promise. The enclosing request from the user and the push promise
+         * are supplied as parameters, and also a {@link Function} which must be called in the implementation
+         * of this method, if the server push is to be accepted. If this method returns without the function
+         * being called, then the push will be cancelled.
+         */
+        public void applyPushPromise(
+            HttpRequest initial, HttpRequest pushPromise,
+            Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor
+        );
+
+
+        /* package-private with push promise Map implementation */
+        static class PushPromisesHandlerWithMap<T> implements PushPromiseHandler<T> {
+
+            private final ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap;
+            private final Function<HttpRequest,BodyHandler<T>> pushPromiseHandler;
+
+            PushPromisesHandlerWithMap(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
+                                       ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
+                this.pushPromiseHandler = pushPromiseHandler;
+                this.pushPromisesMap = pushPromisesMap;
+            }
+
+            @Override
+            public void applyPushPromise(
+                HttpRequest initiatingRequest, HttpRequest pushRequest,
+                Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor)
+            {
+                URI initiatingURI = initiatingRequest.uri();
+                URI pushRequestURI = pushRequest.uri();
+                if (!initiatingURI.getHost().equalsIgnoreCase(pushRequestURI.getHost()))
+                    return;
+
+                int initiatingPort = initiatingURI.getPort();
+                if (initiatingPort == -1 ) {
+                    if ("https".equalsIgnoreCase(initiatingURI.getScheme()))
+                        initiatingPort = 443;
+                    else
+                        initiatingPort = 80;
+                }
+                int pushPort = pushRequestURI.getPort();
+                if (pushPort == -1 ) {
+                    if ("https".equalsIgnoreCase(pushRequestURI.getScheme()))
+                        pushPort = 443;
+                    else
+                        pushPort = 80;
+                }
+                if (initiatingPort != pushPort)
+                    return;
+
+                CompletableFuture<HttpResponse<T>> cf = acceptor.apply(pushPromiseHandler.apply(pushRequest));
+                pushPromisesMap.put(pushRequest, cf);
+            }
+        }
+
+        /**
+         * Returns a push promise handler that accumulates push promises, and
+         * their responses, into the given map.
+         *
+         * <p> Entries are added to the given map for each synthetic push
+         * request ( push promise ) accepted. The entry's key is the
+         * push request, and the entry's value is a CompletableFuture that
+         * completes with the response corresponding to the key's push
+         * request. A push request is rejected / cancelled if there is
+         * already an entry in the map whose key is {@linplain HttpRequest#equal
+         * equal} to it. A push request is rejected / cancelled if it
+         * does not have the same origin as its initiating request.
+         *
+         * <p> Entries are added to the given map as soon as practically
+         * possible when a push promise is received and accepted. That way code,
+         * using such a map like a cache, can determine if a push promise has
+         * been issued by the server and avoid making, possibly, unnecessary
+         * requests.
+         *
+         * <p> The delivery of pushed content is not synchronized with the
+         * delivery of the main response. However, when the main response
+         * has been fully received, the map is guaranteed to be fully populated
+         * with no more entries added. The individual {@code CompletableFutures}
+         * contained in the Map may or may not already be completed at this point.
+         *
+         * @param <T> the push promise body type
+         * @param pushPromiseHandler t he body handler to use for push promises
+         * @param pushPromisesMap a map to accumulate push promises into
+         * @return a push promise body handler
+         */
+        public static <T> PushPromiseHandler<T>
+        withPushPromises(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
+                         ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
+            return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap);
+        }
+    }
+
+    /**
      * A subscriber for response bodies.
      * {@Incubating}
      *
@@ -1081,220 +1179,4 @@
              return new BufferingSubscriber<T>(downstream, bufferSize);
          }
     }
-
-    /**
-     * A response subscriber for a HTTP/2 multi response.
-     * {@Incubating}
-     *
-     * <p> A multi response comprises a main response, and zero or more additional
-     * responses. Each additional response is sent by the server in response to
-     * requests (PUSH_PROMISEs) that the server also generates. Additional responses are
-     * typically resources that the server expects the client will need which
-     * are related to the initial request.
-     * <p>
-     * Note. Instead of implementing this interface, applications should consider
-     * first using the mechanism (built on this interface) provided by
-     * {@link MultiSubscriber#asMap(java.util.function.Function, boolean)
-     * MultiSubscriber.asMap()} which is a slightly simplified, but also
-     * general purpose interface.
-     * <p>
-     * The server generated requests are also known as <i>push promises</i>.
-     * The server is permitted to send any number of these requests up to the
-     * point where the main response is fully received. Therefore, after
-     * completion of the main response, the final number of additional
-     * responses is known. Additional responses may be canceled, but given that
-     * the server does not wait for any acknowledgment before sending the
-     * response, this must be done quickly to avoid unnecessary data transmission.
-     *
-     * <p> {@code MultiSubscriber}s are parameterized with a type {@code U} which
-     * represents some meaningful aggregate of the responses received. This
-     * would typically be a collection of response or response body objects.
-     *
-     * @param <U> a type representing the aggregated results
-     * @param <T> a type representing all of the response bodies
-     *
-     * @since 9
-     */
-    public interface MultiSubscriber<U,T> {
-        /**
-         * Called for the main request from the user. This {@link HttpRequest}
-         * parameter is the request that was supplied to {@link
-         * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The
-         * implementation must return an {@link BodyHandler} for the response
-         * body.
-         *
-         * @param request the request
-         *
-         * @return an optional body handler
-         */
-        BodyHandler<T> onRequest(HttpRequest request);
-
-        /**
-         * Called for each push promise that is received. The {@link HttpRequest}
-         * parameter represents the PUSH_PROMISE. The implementation must return
-         * an {@code Optional} of {@link BodyHandler} for the response body.
-         * Different handlers (of the same type) can be returned for different
-         * pushes within the same multi send. If no handler (an empty {@code
-         * Optional}) is returned, then the push will be canceled. If required,
-         * the {@code CompletableFuture<Void>} supplied to the {@code
-         * onFinalPushPromise} parameter of {@link
-         * #completion(CompletableFuture, CompletableFuture)} can be used to
-         * determine when the final PUSH_PROMISE is received.
-         *
-         * @param pushPromise the push promise
-         *
-         * @return an optional body handler
-         */
-        Optional<BodyHandler<T>> onPushPromise(HttpRequest pushPromise);
-
-        /**
-         * Called for each response received. For each request either one of
-         * onResponse() or onError() is guaranteed to be called, but not both.
-         *
-         * <p> Note: The reason for switching to this callback interface rather
-         * than using CompletableFutures supplied to onRequest() is that there
-         * is a subtle interaction between those CFs and the CF returned from
-         * completion() (or when onComplete() was called formerly). The completion()
-         * CF will not complete until after all of the work done by the onResponse()
-         * calls is done. Whereas if you just create CF's dependent on a supplied
-         * CF (to onRequest()) then the implementation has no visibility of the
-         * dependent CFs and can't guarantee to call onComplete() (or complete
-         * the completion() CF) after the dependent CFs complete.
-         *
-         * @param response the response received
-         */
-        void onResponse(HttpResponse<T> response);
-
-        /**
-         * Called if an error occurs receiving a response. For each request
-         * either one of onResponse() or onError() is guaranteed to be called,
-         * but not both.
-         *
-         * @param request the main request or subsequent push promise
-         * @param t the Throwable that caused the error
-         */
-        void onError(HttpRequest request, Throwable t);
-
-        /**
-         * Returns a {@link java.util.concurrent.CompletableFuture}{@code <U>}
-         * which completes when the aggregate result object itself is available.
-         * It is expected that the returned {@code CompletableFuture} will depend
-         * on one of the given {@code CompletableFuture<Void}s which themselves
-         * complete after all individual responses associated with the multi
-         * response have completed, or after all push promises have been received.
-         * This method is called after {@link #onRequest(HttpRequest)} but
-         * before any other methods.
-         *
-         * @implNote Implementations might follow the pattern shown below
-         * <pre>
-         * {@code
-         *      CompletableFuture<U> completion(
-         *              CompletableFuture<Void> onComplete,
-         *              CompletableFuture<Void> onFinalPushPromise)
-         *      {
-         *          return onComplete.thenApply((v) -> {
-         *              U u = ... instantiate and populate a U instance
-         *              return u;
-         *          });
-         *      }
-         * }
-         * </pre>
-         *
-         * @param onComplete a CompletableFuture which completes after all
-         * responses have been received relating to this multi request.
-         *
-         * @param onFinalPushPromise CompletableFuture which completes after all
-         * push promises have been received.
-         *
-         * @return the aggregate CF response object
-         */
-        CompletableFuture<U> completion(CompletableFuture<Void> onComplete,
-                CompletableFuture<Void> onFinalPushPromise);
-
-        /**
-         * Returns a general purpose handler for multi responses. The aggregated
-         * result object produced by this handler is a
-         * {@code Map<HttpRequest,CompletableFuture<HttpResponse<V>>>}. Each
-         * request (both the original user generated request and each server
-         * generated push promise) is returned as a key of the map. The value
-         * corresponding to each key is a
-         * {@code CompletableFuture<HttpResponse<V>>}.
-         *
-         * <p> There are two ways to use these handlers, depending on the value
-         * of the <i>completion</I> parameter. If completion is true, then the
-         * aggregated result will be available after all responses have
-         * themselves completed. If <i>completion</i> is false, then the
-         * aggregated result will be available immediately after the last push
-         * promise was received. In the former case, this implies that all the
-         * CompletableFutures in the map values will have completed. In the
-         * latter case, they may or may not have completed yet.
-         *
-         * <p> The simplest way to use these handlers is to set completion to
-         * {@code true}, and then all (results) values in the Map will be
-         * accessible without blocking.
-         * <p>
-         * See {@link #asMap(java.util.function.Function, boolean)}
-         * for a code sample of using this interface.
-         *
-         * <p> See {@link #asMap(Function, boolean)} for a code sample of using
-         * this interface.
-         *
-         * @param <V> the body type used for all responses
-         * @param reqHandler a function invoked for the user's request and each
-         *                   push promise
-         * @param completion {@code true} if the aggregate CompletableFuture
-         *                   completes after all responses have been received,
-         *                   or {@code false} after all push promises received
-         *
-         * @return a MultiSubscriber
-         */
-        public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
-                Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler,
-                boolean completion) {
-            return new MultiSubscriberImpl<V>(reqHandler.andThen(optv -> optv.get()),
-                                              reqHandler,
-                                              completion);
-        }
-
-        /**
-         * Returns a general purpose handler for multi responses. This is a
-         * convenience method which invokes {@link #asMap(Function,boolean)
-         * asMap(Function, true)} meaning that the aggregate result
-         * object completes after all responses have been received.
-         *
-         * <p><b>Example usage:</b>
-         * <br>
-         * <pre>
-         * {@code
-         *          HttpRequest request = HttpRequest.newBuilder()
-         *                  .uri(URI.create("https://www.foo.com/"))
-         *                  .GET()
-         *                  .build();
-         *
-         *          HttpClient client = HttpClient.newHttpClient();
-         *
-         *          Map<HttpRequest,CompletableFuture<HttpResponse<String>>> results = client
-         *              .sendAsync(request, MultiSubscriber.asMap(
-         *                  (req) -> Optional.of(HttpResponse.BodyHandler.asString())))
-         *              .join();
-         * }</pre>
-         *
-         * <p> The lambda in this example is the simplest possible implementation,
-         * where neither the incoming requests are examined, nor the response
-         * headers, and every push that the server sends is accepted. When the
-         * join() call returns, all {@code HttpResponse}s and their associated
-         * body objects are available.
-         *
-         * @param <V> the body type used for all responses
-         * @param reqHandler a function invoked for each push promise and the
-         *                   main request
-         * @return a MultiSubscriber
-         */
-        public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
-                Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler) {
-
-            return asMap(reqHandler, true);
-        }
-
-    }
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Tue Jan 16 15:52:01 2018 +0000
@@ -36,6 +36,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
@@ -52,7 +53,7 @@
  *
  * Creates a new Exchange for each request/response interaction
  */
-class MultiExchange<U,T> {
+class MultiExchange<T> {
 
     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
     static final System.Logger DEBUG_LOGGER =
@@ -64,7 +65,6 @@
     final HttpClientImpl client;
     final HttpResponse.BodyHandler<T> responseHandler;
     final Executor executor;
-    final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
     final AtomicInteger attempts = new AtomicInteger();
     HttpRequestImpl currentreq; // used for async only
     Exchange<T> exchange; // the current exchange
@@ -84,7 +84,7 @@
     private final List<HeaderFilter> filters;
     TimedEvent timedEvent;
     volatile boolean cancelled;
-    final PushGroup<U,T> pushGroup;
+    final PushGroup<T> pushGroup;
 
     /**
      * Filter fields. These are attached as required by filters
@@ -103,6 +103,7 @@
                   HttpRequestImpl requestImpl,
                   HttpClientImpl client,
                   HttpResponse.BodyHandler<T> responseHandler,
+                  PushPromiseHandler<T> pushPromiseHandler,
                   AccessControlContext acc) {
         this.previous = null;
         this.userRequest = userRequest;
@@ -118,37 +119,16 @@
             if (responseHandler instanceof UntrustedBodyHandler)
                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
         }
-        this.exchange = new Exchange<>(request, this);
-        this.multiResponseSubscriber = null;
-        this.pushGroup = null;
-    }
 
-    /**
-     * MultiExchange with multiple responses (HTTP/2 server pushes).
-     */
-    MultiExchange(HttpRequest userRequest,
-                  HttpRequestImpl requestImpl,
-                  HttpClientImpl client,
-                  HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
-                  AccessControlContext acc) {
-        this.previous = null;
-        this.userRequest = userRequest;
-        this.request = requestImpl;
-        this.currentreq = request;
-        this.client = client;
-        this.filters = client.filterChain();
-        this.acc = acc;
-        this.executor = client.theExecutor();
-        this.multiResponseSubscriber = multiResponseSubscriber;
-        this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
+        if (pushPromiseHandler != null) {
+            this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
+        } else {
+            pushGroup = null;
+        }
+
         this.exchange = new Exchange<>(request, this);
-        this.responseHandler = pushGroup.mainResponseHandler();
     }
 
-//    CompletableFuture<Void> multiCompletionCF() {
-//        return pushGroup.groupResult();
-//    }
-
     private synchronized Exchange<T> getExchange() {
         return exchange;
     }
@@ -157,10 +137,6 @@
         return client;
     }
 
-//    HttpClient.Redirect followRedirects() {
-//        return client.followRedirects();
-//    }
-
     HttpClient.Version version() {
         return request.version().orElse(client.version());
     }
@@ -233,21 +209,6 @@
                     });
     }
 
-    CompletableFuture<U> multiResponseAsync() {
-        CompletableFuture<Void> start = new MinimalFuture<>();
-        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
-        CompletableFuture<HttpResponse<T>> mainResponse =
-                cf.thenApply(b -> {
-                        multiResponseSubscriber.onResponse(b);
-                        pushGroup.noMorePushes(true);
-                        return b; });
-        pushGroup.setMainResponse(mainResponse);
-        CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
-                                                                      pushGroup.pushesCF());
-        start.completeAsync( () -> null, executor); // trigger execution
-        return res;
-    }
-
     private CompletableFuture<Response> responseAsyncImpl() {
         CompletableFuture<Response> cf;
         if (attempts.incrementAndGet() > max_attempts) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiMapResult.java	Fri Jan 12 15:36:28 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A {@link java.util.Map} containing the result of a HTTP/2 request and multi-response.
- * {@Incubating}
- * <p>
- * This is one possible implementation of the aggregate result type {@code <U>} returned
- * from {@link HttpClient#sendAsync(HttpRequest,HttpResponse.MultiSubscriber) }.
- * The map is indexed by {@link HttpRequest} and each value is a
- * {@link java.util.concurrent.CompletableFuture}&lt;
- * {@link HttpResponse}{@code <V>}&gt;
- * <p>
- * A {@code MultiMapResult} is obtained from an invocation such as the one shown below:
- * <p>
- * {@link CompletableFuture}&lt;{@code MultiMapResult<V>}&gt;
- * {@link HttpClient#sendAsync(HttpRequest,
- * HttpResponse.MultiSubscriber) HttpClient.sendAsync(}{@link
- * HttpResponse.MultiSubscriber#asMap(java.util.function.Function)
- * MultiSubscriber.asMap(Function)})
- *
- * @param <V> the response body type for all responses
- */
-public class MultiMapResult<V> implements Map<HttpRequest,CompletableFuture<HttpResponse<V>>> {
-    private final Map<HttpRequest,CompletableFuture<HttpResponse<V>>> map;
-
-    MultiMapResult(Map<HttpRequest,CompletableFuture<HttpResponse<V>>> map) {
-        this.map = map;
-    }
-
-    @Override
-    public int size() {
-        return map.size();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    public boolean containsKey(Object key) {
-        return map.containsKey(key);
-    }
-
-    @Override
-    public boolean containsValue(Object value) {
-        return map.containsValue(value);
-    }
-
-    @Override
-    public CompletableFuture<HttpResponse<V>> get(Object key) {
-        return map.get(key);
-    }
-
-    @Override
-    public CompletableFuture<HttpResponse<V>> put(HttpRequest key, CompletableFuture<HttpResponse<V>> value) {
-        return map.put(key, value);
-    }
-
-    @Override
-    public CompletableFuture<HttpResponse<V>> remove(Object key) {
-        return map.remove(key);
-    }
-
-    @Override
-    public void putAll(Map<? extends HttpRequest, ? extends CompletableFuture<HttpResponse<V>>> m) {
-        map.putAll(m);
-    }
-
-    @Override
-    public void clear() {
-        map.clear();
-    }
-
-    @Override
-    public Set<HttpRequest> keySet() {
-        return map.keySet();
-    }
-
-    @Override
-    public Collection<CompletableFuture<HttpResponse<V>>> values() {
-        return map.values();
-    }
-
-    @Override
-    public Set<Entry<HttpRequest, CompletableFuture<HttpResponse<V>>>> entrySet() {
-        return map.entrySet();
-    }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Tue Jan 16 15:52:01 2018 +0000
@@ -63,7 +63,7 @@
                 HttpClientImpl client = client();
                 assert client != null;
                 HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
-                MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null);
+                MultiExchange<Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null, null);
                 Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
 
                 return connectExchange
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PushGroup.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PushGroup.java	Tue Jan 16 15:52:01 2018 +0000
@@ -26,9 +26,10 @@
 package jdk.incubator.http;
 
 import java.security.AccessControlContext;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Log;
@@ -37,20 +38,15 @@
  * One PushGroup object is associated with the parent Stream of the pushed
  * Streams. This keeps track of all common state associated with the pushes.
  */
-class PushGroup<U,T> {
-    // the overall completion object, completed when all pushes are done.
-    final CompletableFuture<Void> resultCF;
+class PushGroup<T> {
+    private final HttpRequest initiatingRequest;
+
     final CompletableFuture<Void> noMorePushesCF;
 
     volatile Throwable error; // any exception that occurred during pushes
 
-    // CF for main response
-    final CompletableFuture<HttpResponse<T>> mainResponse;
-
     // user's subscriber object
-    final HttpResponse.MultiSubscriber<U, T> multiSubscriber;
-
-    final HttpResponse.BodyHandler<T> mainBodyHandler;
+    final PushPromiseHandler<T> pushPromiseHandler;
 
     private final AccessControlContext acc;
 
@@ -58,84 +54,69 @@
     int remainingPushes;
     boolean noMorePushes = false;
 
-    PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
-              HttpRequestImpl req,
+    PushGroup(PushPromiseHandler<T> pushPromiseHandler,
+              HttpRequestImpl initiatingRequest,
               AccessControlContext acc) {
-        this(multiSubscriber, req, new MinimalFuture<>(), acc);
+        this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), acc);
     }
 
     // Check mainBodyHandler before calling nested constructor.
-    private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
-                      HttpRequestImpl req,
+    private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,
+                      HttpRequestImpl initiatingRequest,
                       CompletableFuture<HttpResponse<T>> mainResponse,
                       AccessControlContext acc) {
-        this(multiSubscriber,
-             mainResponse,
-             multiSubscriber.onRequest(req),
-             acc);
-    }
-
-    // This private constructor is called after all parameters have been checked.
-    private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
-                      CompletableFuture<HttpResponse<T>> mainResponse,
-                      HttpResponse.BodyHandler<T> mainBodyHandler,
-                      AccessControlContext acc) {
-
-        assert mainResponse != null; // A new instance is created above
-        assert mainBodyHandler != null; // should have been checked above
-
-        this.resultCF = new MinimalFuture<>();
         this.noMorePushesCF = new MinimalFuture<>();
-        this.multiSubscriber = multiSubscriber;
-        this.mainResponse = mainResponse.thenApply(r -> {
-            multiSubscriber.onResponse(r);
-            return r;
-        });
-        this.mainBodyHandler = mainBodyHandler;
-        if (acc != null) {
-            // Restricts the file publisher with the senders ACC, if any
-            if (mainBodyHandler instanceof UntrustedBodyHandler)
-                ((UntrustedBodyHandler)this.mainBodyHandler).setAccessControlContext(acc);
-        }
+        this.pushPromiseHandler = pushPromiseHandler;
+        this.initiatingRequest = initiatingRequest;
+        // Restricts the file publisher with the senders ACC, if any
+        if (pushPromiseHandler instanceof UntrustedBodyHandler)
+            ((UntrustedBodyHandler)this.pushPromiseHandler).setAccessControlContext(acc);
         this.acc = acc;
     }
 
-    CompletableFuture<Void> groupResult() {
-        return resultCF;
-    }
+    static class Acceptor<T> {
+        final HttpRequest initiator, push;
+        volatile HttpResponse.BodyHandler<T> bodyHandler = null;
+        volatile CompletableFuture<HttpResponse<T>> cf;
+
+        Acceptor(HttpRequest initiator, HttpRequest push) {
+            this.initiator = initiator;
+            this.push = push;
+        }
 
-    HttpResponse.MultiSubscriber<U, T> subscriber() {
-        return multiSubscriber;
+        CompletableFuture<HttpResponse<T>> accept(HttpResponse.BodyHandler<T> bodyHandler) {
+            Objects.requireNonNull(bodyHandler);
+            cf = new MinimalFuture<>();
+            if (this.bodyHandler != null)
+                throw new IllegalStateException();
+            this.bodyHandler = bodyHandler;
+            return cf;
+        }
+
+        HttpResponse.BodyHandler<T> bodyHandler() {
+            return bodyHandler;
+        }
+
+        CompletableFuture<HttpResponse<T>> cf() {
+            return cf;
+        }
+
+        boolean accepted() {
+            return cf != null;
+        }
     }
 
-    Optional<BodyHandler<T>> handlerForPushRequest(HttpRequest ppRequest) {
-        Optional<BodyHandler<T>> bh = multiSubscriber.onPushPromise(ppRequest);
-        if (acc != null && bh.isPresent()) {
-            // Restricts the file publisher with the senders ACC, if any
-            BodyHandler<T> x = bh.get();
-            if (x instanceof UntrustedBodyHandler)
-                ((UntrustedBodyHandler)x).setAccessControlContext(acc);
-            bh = Optional.of(x);
-        }
-        return bh;
-    }
+    Acceptor<T> acceptPushRequest(HttpRequest pushRequest) {
+        Acceptor<T> acceptor = new Acceptor<>(initiatingRequest, pushRequest);
+
+        pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);
 
-    HttpResponse.BodyHandler<T> mainResponseHandler() {
-        return mainBodyHandler;
-    }
+        if (acceptor.accepted()) {
+            numberOfPushes++;
+            remainingPushes++;
+        }
+        return acceptor;
 
-    synchronized void setMainResponse(CompletableFuture<HttpResponse<T>> r) {
-        r.whenComplete((HttpResponse<T> response, Throwable t) -> {
-            if (t != null)
-                mainResponse.completeExceptionally(t);
-            else
-                mainResponse.complete(response);
-        });
-    }
-
-    synchronized void addPush() {
-        numberOfPushes++;
-        remainingPushes++;
     }
 
     // This is called when the main body response completes because it means
@@ -171,7 +152,6 @@
             if (Log.trace()) {
                 Log.logTrace("push completed");
             }
-            resultCF.complete(null);
         }
     }
 
@@ -180,6 +160,5 @@
             return;
         }
         this.error = t;
-        resultCF.completeExceptionally(t);
     }
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RedirectFilter.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RedirectFilter.java	Tue Jan 16 15:52:01 2018 +0000
@@ -36,7 +36,7 @@
     HttpClientImpl client;
     HttpClient.Redirect policy;
     String method;
-    MultiExchange<?,?> exchange;
+    MultiExchange<?> exchange;
     static final int DEFAULT_MAX_REDIRECTS = 5;
     URI uri;
 
@@ -48,7 +48,7 @@
     public RedirectFilter() {}
 
     @Override
-    public synchronized void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+    public synchronized void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
         this.request = r;
         this.client = e.client();
         this.policy = client.followRedirects();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Tue Jan 16 15:52:01 2018 +0000
@@ -518,59 +518,6 @@
         }
     }
 
-    static class MultiSubscriberImpl<V>
-        implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
-    {
-        private final MultiMapResult<V> results;
-        private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
-        private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler;
-        private final boolean completion; // aggregate completes on last PP received or overall completion
-
-        MultiSubscriberImpl(
-                Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler,
-                Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) {
-            this.results = new MultiMapResult<>(new ConcurrentHashMap<>());
-            this.requestHandler = requestHandler;
-            this.pushHandler = pushHandler;
-            this.completion = completion;
-        }
-
-        @Override
-        public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) {
-            CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
-            results.put(request, cf);
-            return requestHandler.apply(request);
-        }
-
-        @Override
-        public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) {
-            CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
-            results.put(push, cf);
-            return pushHandler.apply(push);
-        }
-
-        @Override
-        public void onResponse(HttpResponse<V> response) {
-            CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
-            cf.complete(response);
-        }
-
-        @Override
-        public void onError(HttpRequest request, Throwable t) {
-            CompletableFuture<HttpResponse<V>> cf = results.get(request);
-            cf.completeExceptionally(t);
-        }
-
-        @Override
-        public CompletableFuture<MultiMapResult<V>> completion(
-                CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) {
-            if (completion)
-                return onComplete.thenApply((ignored)-> results);
-            else
-                return onFinalPushPromise.thenApply((ignored) -> results);
-        }
-    }
-
     /**
      * Currently this consumes all of the data and ignores it
      */
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Jan 16 15:52:01 2018 +0000
@@ -224,7 +224,7 @@
         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
         CompletableFuture<T> cf = receiveData(bodySubscriber);
 
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(e));
@@ -420,42 +420,46 @@
         }
     }
 
-    void incoming_pushPromise(HttpRequestImpl pushReq,
-                              PushedStream<?,T> pushStream)
+    void incoming_pushPromise(HttpRequestImpl pushRequest,
+                              PushedStream<T> pushStream)
         throws IOException
     {
         if (Log.requests()) {
-            Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
         }
-        PushGroup<?,T> pushGroup = exchange.getPushGroup();
-        if (pushGroup == null || pushGroup.noMorePushes()) {
+        PushGroup<T> pushGroup = exchange.getPushGroup();
+        if (pushGroup == null) {
+            // no push handler set by the user code, i.e. cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
+            pushStream.cancelImpl(ex);
+            return;
+        }
+
+        if (pushGroup.noMorePushes()) {
             cancelImpl(new IllegalStateException("unexpected push promise"
                 + " on stream " + streamid));
             return;
         }
 
-        HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
-
-        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-
-        Optional<HttpResponse.BodyHandler<T>> bpOpt =
-                pushGroup.handlerForPushRequest(pushReq);
+        PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
+        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
 
-        if (!bpOpt.isPresent()) {
-            IOException ex = new IOException("Stream "
-                 + streamid + " cancelled by user");
+        if (!acceptor.accepted()) {
+            // cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
             if (Log.trace()) {
-                Log.logTrace("No body subscriber for {0}: {1}", pushReq,
-                            ex.getMessage());
+                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
+                        ex.getMessage());
             }
             pushStream.cancelImpl(ex);
-            cf.completeExceptionally(ex);
             return;
         }
 
-        pushGroup.addPush();
+        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
+        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+
         pushStream.requestSent();
-        pushStream.setPushHandler(bpOpt.get());
+        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
         // setup housekeeping for when the push is received
         // TODO: deal with ignoring of CF anti-pattern
         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
@@ -467,9 +471,9 @@
             }
             if (t != null) {
                 pushGroup.pushError(t);
-                proc.onError(pushReq, t);
+                pushResponseCF.completeExceptionally(t);
             } else {
-                proc.onResponse(resp);
+                pushResponseCF.complete(resp);
             }
             pushGroup.pushCompleted();
         });
@@ -811,7 +815,7 @@
             cf = cf.thenApplyAsync(r -> r, executor);
         }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
@@ -950,16 +954,16 @@
         Log.logTrace("Stream {0} closed", streamid);
     }
 
-    static class PushedStream<U,T> extends Stream<T> {
-        final PushGroup<U,T> pushGroup;
+    static class PushedStream<T> extends Stream<T> {
+        final PushGroup<T> pushGroup;
         // push streams need the response CF allocated up front as it is
         // given directly to user via the multi handler callback function.
         final CompletableFuture<Response> pushCF;
-        final CompletableFuture<HttpResponse<T>> responseCF;
+        CompletableFuture<HttpResponse<T>> responseCF;
         final HttpRequestImpl pushReq;
         HttpResponse.BodyHandler<T> pushHandler;
 
-        PushedStream(PushGroup<U,T> pushGroup,
+        PushedStream(PushGroup<T> pushGroup,
                      Http2Connection connection,
                      Exchange<T> pushReq) {
             // ## no request body possible, null window controller
@@ -968,6 +972,7 @@
             this.pushReq = pushReq.request();
             this.pushCF = new MinimalFuture<>();
             this.responseCF = new MinimalFuture<>();
+
         }
 
         CompletableFuture<HttpResponse<T>> responseCF() {
--- a/test/jdk/java/net/httpclient/http2/ServerPush.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/ServerPush.java	Tue Jan 16 15:52:01 2018 +0000
@@ -30,89 +30,222 @@
  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
- * @run testng/othervm -Djdk.internal.httpclient.hpack.debug=true -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPush
+ * @run testng/othervm -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPush
  */
 
 import java.io.*;
 import java.net.*;
+import java.nio.ByteBuffer;
 import java.nio.file.*;
-import java.nio.file.attribute.*;
 import jdk.incubator.http.*;
-import jdk.incubator.http.HttpResponse.MultiSubscriber;
 import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.*;
+
 
 public class ServerPush {
 
-    static ExecutorService e = Executors.newCachedThreadPool();
-
     static final int LOOPS = 13;
     static final int FILE_SIZE = 512 * 1024 + 343;
 
     static Path tempFile;
 
-    @Test
-    public static void test() throws Exception {
-        Http2TestServer server = null;
-        final Path dir = Files.createTempDirectory("serverPush");
-        try {
-            server = new Http2TestServer(false, 0);
-            server.addHandler(new PushHandler(FILE_SIZE, LOOPS), "/");
-            tempFile = TestUtil.getAFile(FILE_SIZE);
+    Http2TestServer server;
+    URI uri;
 
-            System.err.println("Server listening on port " + server.getAddress().getPort());
-            server.start();
-            int port = server.getAddress().getPort();
+    @BeforeTest
+    public void setup() throws Exception {
+        server = new Http2TestServer(false, 0);
+        server.addHandler(new PushHandler(FILE_SIZE, LOOPS), "/");
+        tempFile = TestUtil.getAFile(FILE_SIZE);
+        System.out.println("Using temp file:" + tempFile);
 
-            // use multi-level path
-            URI uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
-            HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+        System.err.println("Server listening on port " + server.getAddress().getPort());
+        server.start();
+        int port = server.getAddress().getPort();
+        uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
+    }
 
-            CompletableFuture<MultiMapResult<Path>> cf =
-                HttpClient.newBuilder().version(HttpClient.Version.HTTP_2)
-                    .executor(e).build().sendAsync(
-                        request, MultiSubscriber.asMap((req) -> {
-                            URI u = req.uri();
-                            Path path = Paths.get(dir.toString(), u.getPath());
-                            try {
-                                Files.createDirectories(path.getParent());
-                            } catch (IOException ee) {
-                                throw new UncheckedIOException(ee);
-                            }
-                            return Optional.of(BodyHandler.asFile(path));
-                        }
-                    ));
-            MultiMapResult<Path> results = cf.get();
-
-            //HttpResponse resp = request.response();
-            System.err.println(results.size());
-            Set<HttpRequest> requests = results.keySet();
+    @AfterTest
+    public void teardown() {
+        server.stop();
+    }
 
-            for (HttpRequest r : requests) {
-                URI u = r.uri();
-                Path result = results.get(r).get().body();
-                System.err.printf("%s -> %s\n", u.toString(), result.toString());
-                TestUtil.compareFiles(result, tempFile);
-            }
-            if (requests.size() != LOOPS + 1)
-                throw new RuntimeException("some results missing");
-            System.out.println("TEST OK: sleeping for 5 sec");
-            Thread.sleep (5 * 1000);
-        } finally {
-            e.shutdownNow();
-            server.stop();
-            Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
-                public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
-                    dir.toFile().delete();
-                    return FileVisitResult.CONTINUE;
-                }
-                public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
-                    path.toFile().delete();
-                    return FileVisitResult.CONTINUE;
-                }
-            });
+    static final UnaryOperator<HttpResponse<?>>
+            assert200ResponseCode = (response) -> {
+                assertEquals(response.statusCode(), 200);
+                return response;
+    };
+
+    interface Peeker<T> extends UnaryOperator<T> {
+        void peek(T t);
+
+        default T apply(T t)
+        {
+            peek(t);
+            return t;
         }
     }
+
+    @Test
+    public void testTypeString() throws Exception {
+        // use multi-level path
+        HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+        String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+
+        // Attempt 2
+        HttpClient client = HttpClient.newBuilder()
+                .version(HttpClient.Version.HTTP_2)
+                .build();
+
+        ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<String>>> results = new ConcurrentHashMap<>();
+
+
+        // Example 2 - of(...) building your own Map, everything as a String
+        PushPromiseHandler<String> pph = (initial, pushRequest, acceptor) -> {
+            BodyHandler<String> s = BodyHandler.asString(UTF_8);
+            CompletableFuture<HttpResponse<String>> cf = acceptor.apply(s);
+            results.put(pushRequest, cf);
+        };
+
+        CompletableFuture<HttpResponse<String>> cf =
+                client.sendAsync(request, BodyHandler.asString(), pph);
+        cf.join();
+        results.put(request, cf);
+
+        System.err.println(results.size());
+        Set<HttpRequest> requests = results.keySet();
+
+        System.err.println("results.size: " + results.size());
+        for (HttpRequest r : requests) {
+            String result = results.get(r).get().body();
+            if (!result.equals(tempFileAsString)) {
+                System.err.println("Got [" + result + ", expected [" + tempFileAsString + "]");
+            }
+        }
+        if (requests.size() != LOOPS + 1)
+            throw new RuntimeException("Some results missing, expected:" + LOOPS + 1 + ", got:" + results.size());
+    }
+
+    // --- Path ---
+
+    static final Path dir = Paths.get(".", "serverPush");
+    static BodyHandler<Path> requestToPath(HttpRequest req) {
+        URI u = req.uri();
+        Path path = Paths.get(dir.toString(), u.getPath());
+        try {
+            Files.createDirectories(path.getParent());
+        } catch (IOException ee) {
+            throw new UncheckedIOException(ee);
+        }
+        return BodyHandler.asFile(path);
+    };
+
+    @Test
+    public void testTypePath() throws Exception {
+        HttpClient client = HttpClient.newHttpClient();
+        HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+        ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<Path>>> map
+                = new ConcurrentHashMap<>();
+
+        // Example 4 - of(...) building your own Map, everything as a Path
+        PushPromiseHandler<Path> pushPromiseHandler = (initial, pushRequest, acceptor) -> {
+            BodyHandler<Path> pp = requestToPath(pushRequest);
+            CompletableFuture<HttpResponse<Path>> cf = acceptor.apply(pp);
+            map.put(pushRequest, cf);
+        };
+
+        CompletableFuture<HttpResponse<Path>> cf =
+                client.sendAsync(request, requestToPath(request), pushPromiseHandler);
+        cf.join();
+        map.put(request, cf);
+
+        System.err.println("map.size: " + map.size());
+        for (HttpRequest r : map.keySet()) {
+            Path path = map.get(r).get().body();
+            String fileAsString = new String(Files.readAllBytes(path), UTF_8);
+            String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+            assertEquals(fileAsString, tempFileAsString);
+        }
+        assertEquals(map.size(),  LOOPS + 1);
+    }
+
+    // ---  Consumer<byte[]> ---
+
+    static class ByteArrayConsumer implements Consumer<Optional<byte[]>> {
+        volatile List<byte[]> listByteArrays = new ArrayList<>();
+        volatile byte[] accumulatedBytes;
+
+        public byte[] getAccumulatedBytes() { return accumulatedBytes; }
+
+        @Override
+        public void accept(Optional<byte[]> optionalBytes) {
+            assert accumulatedBytes == null;
+            if (!optionalBytes.isPresent()) {
+                int size = listByteArrays.stream().mapToInt(ba -> ba.length).sum();
+                ByteBuffer bb = ByteBuffer.allocate(size);
+                listByteArrays.stream().forEach(ba -> bb.put(ba));
+                accumulatedBytes = bb.array();
+            } else {
+                listByteArrays.add(optionalBytes.get());
+            }
+        }
+    }
+
+    @Test
+    public void testTypeByteArrayConsumer() throws Exception {
+        HttpClient client = HttpClient.newHttpClient();
+        HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+        ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<Void>>> resultMap
+                = new ConcurrentHashMap<>();
+        Map<HttpRequest,ByteArrayConsumer> byteArrayConsumerMap
+                = new ConcurrentHashMap<>();
+
+        ByteArrayConsumer bac = new ByteArrayConsumer();
+        byteArrayConsumerMap.put(request, bac);
+
+        // Example 5 - withXXX and everything as a consumer of optional byte[]
+        PushPromiseHandler<Void> pushPromiseHandler =
+                PushPromiseHandler.withPushPromises(pushRequest -> {
+                                                       ByteArrayConsumer bc = new ByteArrayConsumer();
+                                                       byteArrayConsumerMap.put(pushRequest, bc);
+                                                       return BodyHandler.asByteArrayConsumer(bc);
+                                                    },
+                                                    resultMap);
+
+        CompletableFuture<HttpResponse<Void>> cf =
+                client.sendAsync(request, BodyHandler.asByteArrayConsumer(bac), pushPromiseHandler);
+        cf.join();
+        resultMap.put(request, cf);
+
+        System.err.println("map.size: " + resultMap.size());
+        for (HttpRequest r : resultMap.keySet()) {
+            resultMap.get(r).join();
+            byte[] ba = byteArrayConsumerMap.get(r).getAccumulatedBytes();
+            String result = new String(ba, UTF_8);
+            System.out.println("HEGO result=" + result);
+            System.out.println("HEGO result.length=" + result.length());
+            System.err.printf("%s -> %s\n", r.uri().toString(), result);
+            String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+            System.out.println("HEGO tempFileAsString=" + tempFileAsString);
+            System.out.println("HEGO tempFileAsString.length=" + tempFileAsString.length());
+            assertEquals(result, tempFileAsString);
+        }
+
+        assertEquals(resultMap.size(), LOOPS + 1);
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/http2/ServerPushWithDiffTypes.java	Tue Jan 16 15:52:01 2018 +0000
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @library /lib/testlibrary server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPushWithDiffTypes
+ */
+
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.*;
+import jdk.incubator.http.*;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import java.util.*;
+import java.util.concurrent.*;
+import jdk.incubator.http.internal.common.HttpHeadersImpl;
+import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class ServerPushWithDiffTypes {
+
+    static Map<String,String> PUSH_PROMISES = Map.of(
+        "/x/y/z/1", "the first push promise body",
+        "/x/y/z/2", "the second push promise body",
+        "/x/y/z/3", "the third push promise body",
+        "/x/y/z/4", "the fourth push promise body",
+        "/x/y/z/5", "the fifth push promise body",
+        "/x/y/z/6", "the sixth push promise body",
+        "/x/y/z/7", "the seventh push promise body",
+        "/x/y/z/8", "the eight push promise body",
+        "/x/y/z/9", "the ninth push promise body"
+    );
+
+    @Test
+    public static void test() throws Exception {
+        Http2TestServer server = null;
+        try {
+            server = new Http2TestServer(false, 0);
+            Http2Handler handler = new ServerPushHandler("the main response body",
+                                                         PUSH_PROMISES);
+            server.addHandler(handler, "/");
+            server.start();
+            int port = server.getAddress().getPort();
+            System.err.println("Server listening on port " + port);
+
+            HttpClient client = HttpClient.newHttpClient();
+            // use multi-level path
+            URI uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
+            HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+            ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<BodyAndType<?>>>> results = new ConcurrentHashMap<>();
+            PushPromiseHandler<BodyAndType<?>> bh = PushPromiseHandler.withPushPromises(
+                (pushRequest) -> new BodyAndTypeHandler(pushRequest), results);
+
+            CompletableFuture<HttpResponse<BodyAndType<?>>> cf = client.sendAsync(request, new BodyAndTypeHandler(request), bh);
+            results.put(request, cf);
+            cf.join();
+            System.err.println("CHEGAR: results.size: " + results.size());
+
+            if (results.size() != PUSH_PROMISES.size() + 1)
+                throw new RuntimeException("Some results missing, expected:"
+                        + (PUSH_PROMISES.size() + 1) + ", got:" + results.size());
+
+            for (HttpRequest r : results.keySet()) {
+                URI u = r.uri();
+                BodyAndType<?> body = results.get(r).get().body();
+                String result;
+                // convert all body types to String for easier comparison
+                if (body.type() == String.class) {
+                    result = (String)body.getBody();
+                } else if (body.type() == byte[].class) {
+                    byte[] bytes = (byte[])body.getBody();
+                    result = new String(bytes, UTF_8);
+                } else if (Path.class.isAssignableFrom(body.type())) {
+                    Path path = (Path)body.getBody();
+                    result = new String(Files.readAllBytes(path), UTF_8);
+                } else {
+                    throw new AssertionError("Unknown:" + body.type());
+                }
+
+                System.err.printf("%s -> %s\n", u.toString(), result.toString());
+                String expected = PUSH_PROMISES.get(r.uri().getPath());
+                if (expected == null)
+                    expected = "the main response body";
+                System.err.println("For " + r + ", got [" + result + "], expected [" + expected +"]");
+                if (!result.equals(expected)) {
+                    throw new RuntimeException("For " + r + ", got [" + result + "], expected [" + expected +"]");
+                }
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+    static interface BodyAndType<T> {
+        Class<T> type();
+        T getBody();
+    }
+
+    static final Path WORK_DIR = Paths.get(".");
+
+    static class BodyAndTypeHandler implements BodyHandler<BodyAndType<?>> {
+        int count;
+        final HttpRequest request;
+
+        BodyAndTypeHandler(HttpRequest request) {
+            this.request = request;
+        }
+
+        @Override
+        public HttpResponse.BodySubscriber<BodyAndType<?>> apply(int statusCode,
+                                                                 HttpHeaders responseHeaders) {
+            int whichType = count++ % 3;  // real world may base this on the request metadata
+            switch (whichType) {
+                case 0: // String
+                    return new BodyAndTypeSubscriber(BodySubscriber.asString(StandardCharsets.UTF_8));
+                case 1: // byte[]
+                    return new BodyAndTypeSubscriber(BodySubscriber.asByteArray());
+                case 2: // Path
+                    URI u = request.uri();
+                    Path path = Paths.get(WORK_DIR.toString(), u.getPath());
+                    try {
+                        Files.createDirectories(path.getParent());
+                    } catch (IOException ee) {
+                        throw new UncheckedIOException(ee);
+                    }
+                    return new BodyAndTypeSubscriber(BodySubscriber.asFile(path));
+                default:
+                    throw new AssertionError("Unexpected " + whichType);
+            }
+        }
+    }
+
+    static class BodyAndTypeSubscriber<T> implements HttpResponse.BodySubscriber<BodyAndType<T>> {
+
+        private static class BodyAndTypeImpl<T> implements BodyAndType<T> {
+            private final Class<T> type;
+            private final T body;
+            public BodyAndTypeImpl(Class<T> type, T body) { this.type = type; this.body = body; }
+            @Override public Class<T> type() { return type; }
+            @Override public T getBody() { return body; }
+        }
+
+        private final BodySubscriber<?> bodySubscriber;
+        private final CompletableFuture<BodyAndType<T>> cf;
+
+        BodyAndTypeSubscriber(BodySubscriber bodySubscriber) {
+            this.bodySubscriber = bodySubscriber;
+            cf = new CompletableFuture<>();
+            bodySubscriber.getBody().whenComplete((r,t) -> cf.complete(new BodyAndTypeImpl(r.getClass(), r)));
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            bodySubscriber.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            bodySubscriber.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            bodySubscriber.onError(throwable);
+            cf.completeExceptionally(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            bodySubscriber.onComplete();
+        }
+
+        @Override
+        public CompletionStage<BodyAndType<T>> getBody() {
+            return cf;
+        }
+    }
+
+    // --- server push handler ---
+    static class ServerPushHandler implements Http2Handler {
+
+        private final String mainResponseBody;
+        private final Map<String,String> promises;
+
+        public ServerPushHandler(String mainResponseBody, Map<String,String> promises) throws Exception {
+            Objects.requireNonNull(promises);
+            this.mainResponseBody = mainResponseBody;
+            this.promises = promises;
+        }
+
+        public void handle(Http2TestExchange exchange) throws IOException {
+            System.err.println("Server: handle " + exchange);
+            try (InputStream is = exchange.getRequestBody()) {
+                is.readAllBytes();
+            }
+
+            if (exchange.serverPushAllowed()) {
+                pushPromises(exchange);
+            }
+
+            // response data for the main response
+            try (OutputStream os = exchange.getResponseBody()) {
+                byte[] bytes = mainResponseBody.getBytes(UTF_8);
+                exchange.sendResponseHeaders(200, bytes.length);
+                os.write(bytes);
+            }
+        }
+
+        private void pushPromises(Http2TestExchange exchange) throws IOException {
+            URI requestURI = exchange.getRequestURI();
+            for (Map.Entry<String,String> promise : promises.entrySet()) {
+                URI uri = requestURI.resolve(promise.getKey());
+                InputStream is = new ByteArrayInputStream(promise.getValue().getBytes(UTF_8));
+                HttpHeadersImpl headers = new HttpHeadersImpl();
+                headers.addHeader("X-Promise-"+promise.getKey(), promise.getKey()); // todo: add some check on headers, maybe
+                exchange.serverPush(uri, headers, is);
+            }
+            System.err.println("Server: All pushes sent");
+        }
+    }
+}
--- a/test/jdk/java/net/httpclient/http2/server/PushHandler.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/PushHandler.java	Tue Jan 16 15:52:01 2018 +0000
@@ -46,9 +46,10 @@
             invocation++;
 
             if (ee.serverPushAllowed()) {
+                URI requestURI = ee.getRequestURI();
                 for (int i=0; i<loops; i++) {
                     InputStream is = new FileInputStream(tempFile.toFile());
-                    URI u = new URI ("http://www.foo.com/x/y/z/" + Integer.toString(i));
+                    URI u = requestURI.resolve("/x/y/z/" + Integer.toString(i));
                     HttpHeadersImpl h = new HttpHeadersImpl();
                     h.addHeader("X-foo", "bar");
                     ee.serverPush(u, h, is);
--- a/test/jdk/java/net/httpclient/http2/server/TestUtil.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/TestUtil.java	Tue Jan 16 15:52:01 2018 +0000
@@ -45,7 +45,6 @@
     public static Path tempFile() {
         try {
             Path p = Files.createTempFile("foo", "test");
-            p.toFile().deleteOnExit();
             return p;
         } catch (IOException e) {
             throw new UncheckedIOException(e);
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java	Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java	Tue Jan 16 15:52:01 2018 +0000
@@ -239,8 +239,9 @@
             return error();
         }
         @Override
-        public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
-                HttpResponse.MultiSubscriber<U, T> multiSubscriber) {
+        public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
+                HttpResponse.BodyHandler<T> bodyHandler,
+                HttpResponse.PushPromiseHandler<T> multiHandler) {
             return error();
         }
     }