diff -r cf8792f51dee -r 782b2f2d1e76 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 12 15:36:28 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Jan 16 15:52:01 2018 +0000 @@ -25,11 +25,11 @@ package jdk.incubator.http; +import jdk.incubator.http.internal.common.MinimalFuture; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import jdk.incubator.http.ResponseSubscribers.MultiSubscriberImpl; import static jdk.incubator.http.internal.common.Utils.unchecked; import static jdk.incubator.http.internal.common.Utils.charsetFrom; import java.nio.ByteBuffer; @@ -46,8 +46,10 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -314,9 +316,7 @@ * * @param the response body type */ - @FunctionalInterface public interface BodyHandler { - /** * Returns a {@link BodySubscriber BodySubscriber} considering the given * response status code and headers. This method is always called before @@ -331,6 +331,7 @@ */ public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders); + /** * Returns a response body handler that returns a {@link BodySubscriber * BodySubscriber}{@code } obtained from {@linkplain @@ -724,6 +725,103 @@ } /** + * A handler of push promises ... + */ + public interface PushPromiseHandler { + /** + * Notifies of an incoming Push Promise. The enclosing request from the user and the push promise + * are supplied as parameters, and also a {@link Function} which must be called in the implementation + * of this method, if the server push is to be accepted. If this method returns without the function + * being called, then the push will be cancelled. + */ + public void applyPushPromise( + HttpRequest initial, HttpRequest pushPromise, + Function,CompletableFuture>> acceptor + ); + + + /* package-private with push promise Map implementation */ + static class PushPromisesHandlerWithMap implements PushPromiseHandler { + + private final ConcurrentMap>> pushPromisesMap; + private final Function> pushPromiseHandler; + + PushPromisesHandlerWithMap(Function> pushPromiseHandler, + ConcurrentMap>> pushPromisesMap) { + this.pushPromiseHandler = pushPromiseHandler; + this.pushPromisesMap = pushPromisesMap; + } + + @Override + public void applyPushPromise( + HttpRequest initiatingRequest, HttpRequest pushRequest, + Function,CompletableFuture>> acceptor) + { + URI initiatingURI = initiatingRequest.uri(); + URI pushRequestURI = pushRequest.uri(); + if (!initiatingURI.getHost().equalsIgnoreCase(pushRequestURI.getHost())) + return; + + int initiatingPort = initiatingURI.getPort(); + if (initiatingPort == -1 ) { + if ("https".equalsIgnoreCase(initiatingURI.getScheme())) + initiatingPort = 443; + else + initiatingPort = 80; + } + int pushPort = pushRequestURI.getPort(); + if (pushPort == -1 ) { + if ("https".equalsIgnoreCase(pushRequestURI.getScheme())) + pushPort = 443; + else + pushPort = 80; + } + if (initiatingPort != pushPort) + return; + + CompletableFuture> cf = acceptor.apply(pushPromiseHandler.apply(pushRequest)); + pushPromisesMap.put(pushRequest, cf); + } + } + + /** + * Returns a push promise handler that accumulates push promises, and + * their responses, into the given map. + * + *

Entries are added to the given map for each synthetic push + * request ( push promise ) accepted. The entry's key is the + * push request, and the entry's value is a CompletableFuture that + * completes with the response corresponding to the key's push + * request. A push request is rejected / cancelled if there is + * already an entry in the map whose key is {@linplain HttpRequest#equal + * equal} to it. A push request is rejected / cancelled if it + * does not have the same origin as its initiating request. + * + *

Entries are added to the given map as soon as practically + * possible when a push promise is received and accepted. That way code, + * using such a map like a cache, can determine if a push promise has + * been issued by the server and avoid making, possibly, unnecessary + * requests. + * + *

The delivery of pushed content is not synchronized with the + * delivery of the main response. However, when the main response + * has been fully received, the map is guaranteed to be fully populated + * with no more entries added. The individual {@code CompletableFutures} + * contained in the Map may or may not already be completed at this point. + * + * @param the push promise body type + * @param pushPromiseHandler t he body handler to use for push promises + * @param pushPromisesMap a map to accumulate push promises into + * @return a push promise body handler + */ + public static PushPromiseHandler + withPushPromises(Function> pushPromiseHandler, + ConcurrentMap>> pushPromisesMap) { + return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap); + } + } + + /** * A subscriber for response bodies. * {@Incubating} * @@ -1081,220 +1179,4 @@ return new BufferingSubscriber(downstream, bufferSize); } } - - /** - * A response subscriber for a HTTP/2 multi response. - * {@Incubating} - * - *

A multi response comprises a main response, and zero or more additional - * responses. Each additional response is sent by the server in response to - * requests (PUSH_PROMISEs) that the server also generates. Additional responses are - * typically resources that the server expects the client will need which - * are related to the initial request. - *

- * Note. Instead of implementing this interface, applications should consider - * first using the mechanism (built on this interface) provided by - * {@link MultiSubscriber#asMap(java.util.function.Function, boolean) - * MultiSubscriber.asMap()} which is a slightly simplified, but also - * general purpose interface. - *

- * The server generated requests are also known as push promises. - * The server is permitted to send any number of these requests up to the - * point where the main response is fully received. Therefore, after - * completion of the main response, the final number of additional - * responses is known. Additional responses may be canceled, but given that - * the server does not wait for any acknowledgment before sending the - * response, this must be done quickly to avoid unnecessary data transmission. - * - *

{@code MultiSubscriber}s are parameterized with a type {@code U} which - * represents some meaningful aggregate of the responses received. This - * would typically be a collection of response or response body objects. - * - * @param a type representing the aggregated results - * @param a type representing all of the response bodies - * - * @since 9 - */ - public interface MultiSubscriber { - /** - * Called for the main request from the user. This {@link HttpRequest} - * parameter is the request that was supplied to {@link - * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The - * implementation must return an {@link BodyHandler} for the response - * body. - * - * @param request the request - * - * @return an optional body handler - */ - BodyHandler onRequest(HttpRequest request); - - /** - * Called for each push promise that is received. The {@link HttpRequest} - * parameter represents the PUSH_PROMISE. The implementation must return - * an {@code Optional} of {@link BodyHandler} for the response body. - * Different handlers (of the same type) can be returned for different - * pushes within the same multi send. If no handler (an empty {@code - * Optional}) is returned, then the push will be canceled. If required, - * the {@code CompletableFuture} supplied to the {@code - * onFinalPushPromise} parameter of {@link - * #completion(CompletableFuture, CompletableFuture)} can be used to - * determine when the final PUSH_PROMISE is received. - * - * @param pushPromise the push promise - * - * @return an optional body handler - */ - Optional> onPushPromise(HttpRequest pushPromise); - - /** - * Called for each response received. For each request either one of - * onResponse() or onError() is guaranteed to be called, but not both. - * - *

Note: The reason for switching to this callback interface rather - * than using CompletableFutures supplied to onRequest() is that there - * is a subtle interaction between those CFs and the CF returned from - * completion() (or when onComplete() was called formerly). The completion() - * CF will not complete until after all of the work done by the onResponse() - * calls is done. Whereas if you just create CF's dependent on a supplied - * CF (to onRequest()) then the implementation has no visibility of the - * dependent CFs and can't guarantee to call onComplete() (or complete - * the completion() CF) after the dependent CFs complete. - * - * @param response the response received - */ - void onResponse(HttpResponse response); - - /** - * Called if an error occurs receiving a response. For each request - * either one of onResponse() or onError() is guaranteed to be called, - * but not both. - * - * @param request the main request or subsequent push promise - * @param t the Throwable that caused the error - */ - void onError(HttpRequest request, Throwable t); - - /** - * Returns a {@link java.util.concurrent.CompletableFuture}{@code } - * which completes when the aggregate result object itself is available. - * It is expected that the returned {@code CompletableFuture} will depend - * on one of the given {@code CompletableFuture - * {@code - * CompletableFuture completion( - * CompletableFuture onComplete, - * CompletableFuture onFinalPushPromise) - * { - * return onComplete.thenApply((v) -> { - * U u = ... instantiate and populate a U instance - * return u; - * }); - * } - * } - * - * - * @param onComplete a CompletableFuture which completes after all - * responses have been received relating to this multi request. - * - * @param onFinalPushPromise CompletableFuture which completes after all - * push promises have been received. - * - * @return the aggregate CF response object - */ - CompletableFuture completion(CompletableFuture onComplete, - CompletableFuture onFinalPushPromise); - - /** - * Returns a general purpose handler for multi responses. The aggregated - * result object produced by this handler is a - * {@code Map>>}. Each - * request (both the original user generated request and each server - * generated push promise) is returned as a key of the map. The value - * corresponding to each key is a - * {@code CompletableFuture>}. - * - *

There are two ways to use these handlers, depending on the value - * of the completion parameter. If completion is true, then the - * aggregated result will be available after all responses have - * themselves completed. If completion is false, then the - * aggregated result will be available immediately after the last push - * promise was received. In the former case, this implies that all the - * CompletableFutures in the map values will have completed. In the - * latter case, they may or may not have completed yet. - * - *

The simplest way to use these handlers is to set completion to - * {@code true}, and then all (results) values in the Map will be - * accessible without blocking. - *

- * See {@link #asMap(java.util.function.Function, boolean)} - * for a code sample of using this interface. - * - *

See {@link #asMap(Function, boolean)} for a code sample of using - * this interface. - * - * @param the body type used for all responses - * @param reqHandler a function invoked for the user's request and each - * push promise - * @param completion {@code true} if the aggregate CompletableFuture - * completes after all responses have been received, - * or {@code false} after all push promises received - * - * @return a MultiSubscriber - */ - public static MultiSubscriber,V> asMap( - Function>> reqHandler, - boolean completion) { - return new MultiSubscriberImpl(reqHandler.andThen(optv -> optv.get()), - reqHandler, - completion); - } - - /** - * Returns a general purpose handler for multi responses. This is a - * convenience method which invokes {@link #asMap(Function,boolean) - * asMap(Function, true)} meaning that the aggregate result - * object completes after all responses have been received. - * - *

Example usage: - *
- *

-         * {@code
-         *          HttpRequest request = HttpRequest.newBuilder()
-         *                  .uri(URI.create("https://www.foo.com/"))
-         *                  .GET()
-         *                  .build();
-         *
-         *          HttpClient client = HttpClient.newHttpClient();
-         *
-         *          Map>> results = client
-         *              .sendAsync(request, MultiSubscriber.asMap(
-         *                  (req) -> Optional.of(HttpResponse.BodyHandler.asString())))
-         *              .join();
-         * }
- * - *

The lambda in this example is the simplest possible implementation, - * where neither the incoming requests are examined, nor the response - * headers, and every push that the server sends is accepted. When the - * join() call returns, all {@code HttpResponse}s and their associated - * body objects are available. - * - * @param the body type used for all responses - * @param reqHandler a function invoked for each push promise and the - * main request - * @return a MultiSubscriber - */ - public static MultiSubscriber,V> asMap( - Function>> reqHandler) { - - return asMap(reqHandler, true); - } - - } }