--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java Tue Jan 16 15:52:01 2018 +0000
@@ -43,7 +43,7 @@
* Implementation of Http Basic authentication.
*/
class AuthenticationFilter implements HeaderFilter {
- volatile MultiExchange<?,?> exchange;
+ volatile MultiExchange<?> exchange;
private static final Base64.Encoder encoder = Base64.getEncoder();
static final int DEFAULT_RETRY_LIMIT = 3;
@@ -108,7 +108,7 @@
}
@Override
- public void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+ public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
// use preemptive authentication if an entry exists.
Cache cache = getCache(e);
this.exchange = e;
@@ -263,7 +263,7 @@
// be garbaged collected when no longer referenced.
static final WeakHashMap<HttpClientImpl,Cache> caches = new WeakHashMap<>();
- static synchronized Cache getCache(MultiExchange<?,?> exchange) {
+ static synchronized Cache getCache(MultiExchange<?> exchange) {
HttpClientImpl client = exchange.client();
Cache c = caches.get(client);
if (c == null) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/CookieFilter.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/CookieFilter.java Tue Jan 16 15:52:01 2018 +0000
@@ -39,7 +39,7 @@
}
@Override
- public void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+ public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
HttpClientImpl client = e.client();
Optional<CookieHandler> cookieHandlerOpt = client.cookieHandler();
if (cookieHandlerOpt.isPresent()) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Tue Jan 16 15:52:01 2018 +0000
@@ -69,13 +69,13 @@
// has been established.
private volatile IOException failed;
final AccessControlContext acc;
- final MultiExchange<?,T> multi;
+ final MultiExchange<T> multi;
final Executor parentExecutor;
boolean upgrading; // to HTTP/2
- final PushGroup<?,T> pushGroup;
+ final PushGroup<T> pushGroup;
final String dbgTag;
- Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
+ Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
this.request = request;
this.upgrading = false;
this.client = multi.client();
@@ -88,7 +88,7 @@
/* If different AccessControlContext to be used */
Exchange(HttpRequestImpl request,
- MultiExchange<?,T> multi,
+ MultiExchange<T> multi,
AccessControlContext acc)
{
this.request = request;
@@ -101,7 +101,7 @@
this.dbgTag = "Exchange";
}
- PushGroup<?,T> getPushGroup() {
+ PushGroup<T> getPushGroup() {
return pushGroup;
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HeaderFilter.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HeaderFilter.java Tue Jan 16 15:52:01 2018 +0000
@@ -34,7 +34,7 @@
*/
interface HeaderFilter {
- void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException;
+ void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException;
/**
* Returns null if response ok to be given to user. Non null is a request
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Jan 16 15:52:01 2018 +0000
@@ -642,7 +642,7 @@
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
- Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
+ Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
parent.incoming_pushPromise(pushReq, pushStream);
@@ -839,8 +839,8 @@
return stream;
}
- <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
- PushGroup<?,T> pg = parent.exchange.getPushGroup();
+ <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
+ PushGroup<T> pg = parent.exchange.getPushGroup();
return new Stream.PushedStream<>(pg, this, pushEx);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClient.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClient.java Tue Jan 16 15:52:01 2018 +0000
@@ -39,6 +39,8 @@
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
/**
* A container for configuration information common to multiple {@link
@@ -447,31 +449,23 @@
* @return a {@code CompletableFuture<HttpResponse<T>>}
*/
public abstract <T> CompletableFuture<HttpResponse<T>>
- sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler);
+ sendAsync(HttpRequest req,
+ BodyHandler<T> responseBodyHandler);
/**
- * Sends the given request asynchronously using this client and the given
- * multi response handler.
+ * Sends the given request asynchronously using this client with the given
+ * response body handler and push promise handler.
*
- * <p> The returned completable future completes exceptionally with:
- * <ul>
- * <li>{@link IOException} - if an I/O error occurs when sending or receiving</li>
- * <li>{@link IllegalArgumentException} - if the request method is not supported</li>
- * <li>{@link SecurityException} - If a security manager has been installed
- * and it denies {@link java.net.URLPermission access} to the
- * URL in the given request, or proxy if one is configured.
- * See HttpRequest for further information about
- * <a href="HttpRequest.html#securitychecks">security checks</a>.</li>
- * </ul>
- *
- * @param <U> a type representing the aggregated results
- * @param <T> a type representing all of the response bodies
+ * @param <T> the response body type
* @param req the request
- * @param multiSubscriber the multiSubscriber for the request
- * @return a {@code CompletableFuture<U>}
+ * @param responseBodyHandler the response body handler
+ * @param pushPromiseHandler push promise handler, may be null
+ * @return a {@code CompletableFuture<HttpResponse<T>>}
*/
- public abstract <U, T> CompletableFuture<U>
- sendAsync(HttpRequest req, HttpResponse.MultiSubscriber<U, T> multiSubscriber);
+ public abstract <T> CompletableFuture<HttpResponse<T>>
+ sendAsync(HttpRequest req,
+ BodyHandler<T> responseBodyHandler,
+ PushPromiseHandler<T> pushPromiseHandler);
/**
* Creates a new {@code WebSocket} builder (optional operation).
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientFacade.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientFacade.java Tue Jan 16 15:52:01 2018 +0000
@@ -36,6 +36,8 @@
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
/**
* An HttpClientFacade is a simple class that wraps an HttpClient implementation
@@ -115,10 +117,12 @@
}
@Override
- public <U, T> CompletableFuture<U>
- sendAsync(HttpRequest req, HttpResponse.MultiSubscriber<U, T> multiSubscriber) {
+ public <T> CompletableFuture<HttpResponse<T>>
+ sendAsync(HttpRequest req,
+ BodyHandler<T> responseBodyHandler,
+ PushPromiseHandler<T> pushPromiseHandler){
try {
- return impl.sendAsync(req, multiSubscriber);
+ return impl.sendAsync(req, responseBodyHandler, pushPromiseHandler);
} finally {
Reference.reachabilityFence(this);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Tue Jan 16 15:52:01 2018 +0000
@@ -61,7 +61,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import jdk.incubator.http.HttpResponse.BodyHandler;
-import jdk.incubator.http.HttpResponse.MultiSubscriber;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.Utils;
@@ -417,6 +417,16 @@
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
{
+ return sendAsync(userRequest, responseHandler, null);
+ }
+
+
+ @Override
+ public <T> CompletableFuture<HttpResponse<T>>
+ sendAsync(HttpRequest userRequest,
+ BodyHandler<T> responseHandler,
+ PushPromiseHandler<T> pushPromiseHandler)
+ {
AccessControlContext acc = null;
if (System.getSecurityManager() != null)
acc = AccessController.getContext();
@@ -431,10 +441,11 @@
try {
debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
- MultiExchange<Void,T> mex = new MultiExchange<>(userRequest,
+ MultiExchange<T> mex = new MultiExchange<>(userRequest,
requestImpl,
this,
responseHandler,
+ pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
mex.responseAsync().whenComplete((b,t) -> unreference());
@@ -456,48 +467,6 @@
}
}
- @Override
- public <U, T> CompletableFuture<U>
- sendAsync(HttpRequest userRequest, MultiSubscriber<U, T> responseHandler) {
- AccessControlContext acc = null;
- if (System.getSecurityManager() != null)
- acc = AccessController.getContext();
-
- // Clone the, possibly untrusted, HttpRequest
- HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
- if (requestImpl.method().equals("CONNECT"))
- throw new IllegalArgumentException("Unsupported method CONNECT");
-
- long start = DEBUGELAPSED ? System.nanoTime() : 0;
- reference();
- try {
- debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest);
-
- MultiExchange<U,T> mex = new MultiExchange<>(userRequest,
- requestImpl,
- this,
- responseHandler,
- acc);
- CompletableFuture<U> res = mex.multiResponseAsync()
- .whenComplete((b,t) -> unreference());
- if (DEBUGELAPSED) {
- res = res.whenComplete(
- (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
- }
- // makes sure that any dependent actions happen in the executor
- if (acc != null) {
- res.whenCompleteAsync((r, t) -> { /* do nothing */},
- new PrivilegedExecutor(executor, acc));
- }
-
- return res;
- } catch(Throwable t) {
- unreference();
- debugCompleted("ClientImpl (async)", start, userRequest);
- throw t;
- }
- }
-
// Main loop for this client's selector
private final static class SelectorManager extends Thread {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Jan 16 15:52:01 2018 +0000
@@ -25,11 +25,11 @@
package jdk.incubator.http;
+import jdk.incubator.http.internal.common.MinimalFuture;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import jdk.incubator.http.ResponseSubscribers.MultiSubscriberImpl;
import static jdk.incubator.http.internal.common.Utils.unchecked;
import static jdk.incubator.http.internal.common.Utils.charsetFrom;
import java.nio.ByteBuffer;
@@ -46,8 +46,10 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -314,9 +316,7 @@
*
* @param <T> the response body type
*/
- @FunctionalInterface
public interface BodyHandler<T> {
-
/**
* Returns a {@link BodySubscriber BodySubscriber} considering the given
* response status code and headers. This method is always called before
@@ -331,6 +331,7 @@
*/
public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);
+
/**
* Returns a response body handler that returns a {@link BodySubscriber
* BodySubscriber}{@code <Void>} obtained from {@linkplain
@@ -724,6 +725,103 @@
}
/**
+ * A handler of <i>push promises</i> ...
+ */
+ public interface PushPromiseHandler<T> {
+ /**
+ * Notifies of an incoming Push Promise. The enclosing request from the user and the push promise
+ * are supplied as parameters, and also a {@link Function} which must be called in the implementation
+ * of this method, if the server push is to be accepted. If this method returns without the function
+ * being called, then the push will be cancelled.
+ */
+ public void applyPushPromise(
+ HttpRequest initial, HttpRequest pushPromise,
+ Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor
+ );
+
+
+ /* package-private with push promise Map implementation */
+ static class PushPromisesHandlerWithMap<T> implements PushPromiseHandler<T> {
+
+ private final ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap;
+ private final Function<HttpRequest,BodyHandler<T>> pushPromiseHandler;
+
+ PushPromisesHandlerWithMap(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
+ ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
+ this.pushPromiseHandler = pushPromiseHandler;
+ this.pushPromisesMap = pushPromisesMap;
+ }
+
+ @Override
+ public void applyPushPromise(
+ HttpRequest initiatingRequest, HttpRequest pushRequest,
+ Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor)
+ {
+ URI initiatingURI = initiatingRequest.uri();
+ URI pushRequestURI = pushRequest.uri();
+ if (!initiatingURI.getHost().equalsIgnoreCase(pushRequestURI.getHost()))
+ return;
+
+ int initiatingPort = initiatingURI.getPort();
+ if (initiatingPort == -1 ) {
+ if ("https".equalsIgnoreCase(initiatingURI.getScheme()))
+ initiatingPort = 443;
+ else
+ initiatingPort = 80;
+ }
+ int pushPort = pushRequestURI.getPort();
+ if (pushPort == -1 ) {
+ if ("https".equalsIgnoreCase(pushRequestURI.getScheme()))
+ pushPort = 443;
+ else
+ pushPort = 80;
+ }
+ if (initiatingPort != pushPort)
+ return;
+
+ CompletableFuture<HttpResponse<T>> cf = acceptor.apply(pushPromiseHandler.apply(pushRequest));
+ pushPromisesMap.put(pushRequest, cf);
+ }
+ }
+
+ /**
+ * Returns a push promise handler that accumulates push promises, and
+ * their responses, into the given map.
+ *
+ * <p> Entries are added to the given map for each synthetic push
+ * request ( push promise ) accepted. The entry's key is the
+ * push request, and the entry's value is a CompletableFuture that
+ * completes with the response corresponding to the key's push
+ * request. A push request is rejected / cancelled if there is
+ * already an entry in the map whose key is {@linplain HttpRequest#equal
+ * equal} to it. A push request is rejected / cancelled if it
+ * does not have the same origin as its initiating request.
+ *
+ * <p> Entries are added to the given map as soon as practically
+ * possible when a push promise is received and accepted. That way code,
+ * using such a map like a cache, can determine if a push promise has
+ * been issued by the server and avoid making, possibly, unnecessary
+ * requests.
+ *
+ * <p> The delivery of pushed content is not synchronized with the
+ * delivery of the main response. However, when the main response
+ * has been fully received, the map is guaranteed to be fully populated
+ * with no more entries added. The individual {@code CompletableFutures}
+ * contained in the Map may or may not already be completed at this point.
+ *
+ * @param <T> the push promise body type
+ * @param pushPromiseHandler t he body handler to use for push promises
+ * @param pushPromisesMap a map to accumulate push promises into
+ * @return a push promise body handler
+ */
+ public static <T> PushPromiseHandler<T>
+ withPushPromises(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
+ ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
+ return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap);
+ }
+ }
+
+ /**
* A subscriber for response bodies.
* {@Incubating}
*
@@ -1081,220 +1179,4 @@
return new BufferingSubscriber<T>(downstream, bufferSize);
}
}
-
- /**
- * A response subscriber for a HTTP/2 multi response.
- * {@Incubating}
- *
- * <p> A multi response comprises a main response, and zero or more additional
- * responses. Each additional response is sent by the server in response to
- * requests (PUSH_PROMISEs) that the server also generates. Additional responses are
- * typically resources that the server expects the client will need which
- * are related to the initial request.
- * <p>
- * Note. Instead of implementing this interface, applications should consider
- * first using the mechanism (built on this interface) provided by
- * {@link MultiSubscriber#asMap(java.util.function.Function, boolean)
- * MultiSubscriber.asMap()} which is a slightly simplified, but also
- * general purpose interface.
- * <p>
- * The server generated requests are also known as <i>push promises</i>.
- * The server is permitted to send any number of these requests up to the
- * point where the main response is fully received. Therefore, after
- * completion of the main response, the final number of additional
- * responses is known. Additional responses may be canceled, but given that
- * the server does not wait for any acknowledgment before sending the
- * response, this must be done quickly to avoid unnecessary data transmission.
- *
- * <p> {@code MultiSubscriber}s are parameterized with a type {@code U} which
- * represents some meaningful aggregate of the responses received. This
- * would typically be a collection of response or response body objects.
- *
- * @param <U> a type representing the aggregated results
- * @param <T> a type representing all of the response bodies
- *
- * @since 9
- */
- public interface MultiSubscriber<U,T> {
- /**
- * Called for the main request from the user. This {@link HttpRequest}
- * parameter is the request that was supplied to {@link
- * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The
- * implementation must return an {@link BodyHandler} for the response
- * body.
- *
- * @param request the request
- *
- * @return an optional body handler
- */
- BodyHandler<T> onRequest(HttpRequest request);
-
- /**
- * Called for each push promise that is received. The {@link HttpRequest}
- * parameter represents the PUSH_PROMISE. The implementation must return
- * an {@code Optional} of {@link BodyHandler} for the response body.
- * Different handlers (of the same type) can be returned for different
- * pushes within the same multi send. If no handler (an empty {@code
- * Optional}) is returned, then the push will be canceled. If required,
- * the {@code CompletableFuture<Void>} supplied to the {@code
- * onFinalPushPromise} parameter of {@link
- * #completion(CompletableFuture, CompletableFuture)} can be used to
- * determine when the final PUSH_PROMISE is received.
- *
- * @param pushPromise the push promise
- *
- * @return an optional body handler
- */
- Optional<BodyHandler<T>> onPushPromise(HttpRequest pushPromise);
-
- /**
- * Called for each response received. For each request either one of
- * onResponse() or onError() is guaranteed to be called, but not both.
- *
- * <p> Note: The reason for switching to this callback interface rather
- * than using CompletableFutures supplied to onRequest() is that there
- * is a subtle interaction between those CFs and the CF returned from
- * completion() (or when onComplete() was called formerly). The completion()
- * CF will not complete until after all of the work done by the onResponse()
- * calls is done. Whereas if you just create CF's dependent on a supplied
- * CF (to onRequest()) then the implementation has no visibility of the
- * dependent CFs and can't guarantee to call onComplete() (or complete
- * the completion() CF) after the dependent CFs complete.
- *
- * @param response the response received
- */
- void onResponse(HttpResponse<T> response);
-
- /**
- * Called if an error occurs receiving a response. For each request
- * either one of onResponse() or onError() is guaranteed to be called,
- * but not both.
- *
- * @param request the main request or subsequent push promise
- * @param t the Throwable that caused the error
- */
- void onError(HttpRequest request, Throwable t);
-
- /**
- * Returns a {@link java.util.concurrent.CompletableFuture}{@code <U>}
- * which completes when the aggregate result object itself is available.
- * It is expected that the returned {@code CompletableFuture} will depend
- * on one of the given {@code CompletableFuture<Void}s which themselves
- * complete after all individual responses associated with the multi
- * response have completed, or after all push promises have been received.
- * This method is called after {@link #onRequest(HttpRequest)} but
- * before any other methods.
- *
- * @implNote Implementations might follow the pattern shown below
- * <pre>
- * {@code
- * CompletableFuture<U> completion(
- * CompletableFuture<Void> onComplete,
- * CompletableFuture<Void> onFinalPushPromise)
- * {
- * return onComplete.thenApply((v) -> {
- * U u = ... instantiate and populate a U instance
- * return u;
- * });
- * }
- * }
- * </pre>
- *
- * @param onComplete a CompletableFuture which completes after all
- * responses have been received relating to this multi request.
- *
- * @param onFinalPushPromise CompletableFuture which completes after all
- * push promises have been received.
- *
- * @return the aggregate CF response object
- */
- CompletableFuture<U> completion(CompletableFuture<Void> onComplete,
- CompletableFuture<Void> onFinalPushPromise);
-
- /**
- * Returns a general purpose handler for multi responses. The aggregated
- * result object produced by this handler is a
- * {@code Map<HttpRequest,CompletableFuture<HttpResponse<V>>>}. Each
- * request (both the original user generated request and each server
- * generated push promise) is returned as a key of the map. The value
- * corresponding to each key is a
- * {@code CompletableFuture<HttpResponse<V>>}.
- *
- * <p> There are two ways to use these handlers, depending on the value
- * of the <i>completion</I> parameter. If completion is true, then the
- * aggregated result will be available after all responses have
- * themselves completed. If <i>completion</i> is false, then the
- * aggregated result will be available immediately after the last push
- * promise was received. In the former case, this implies that all the
- * CompletableFutures in the map values will have completed. In the
- * latter case, they may or may not have completed yet.
- *
- * <p> The simplest way to use these handlers is to set completion to
- * {@code true}, and then all (results) values in the Map will be
- * accessible without blocking.
- * <p>
- * See {@link #asMap(java.util.function.Function, boolean)}
- * for a code sample of using this interface.
- *
- * <p> See {@link #asMap(Function, boolean)} for a code sample of using
- * this interface.
- *
- * @param <V> the body type used for all responses
- * @param reqHandler a function invoked for the user's request and each
- * push promise
- * @param completion {@code true} if the aggregate CompletableFuture
- * completes after all responses have been received,
- * or {@code false} after all push promises received
- *
- * @return a MultiSubscriber
- */
- public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
- Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler,
- boolean completion) {
- return new MultiSubscriberImpl<V>(reqHandler.andThen(optv -> optv.get()),
- reqHandler,
- completion);
- }
-
- /**
- * Returns a general purpose handler for multi responses. This is a
- * convenience method which invokes {@link #asMap(Function,boolean)
- * asMap(Function, true)} meaning that the aggregate result
- * object completes after all responses have been received.
- *
- * <p><b>Example usage:</b>
- * <br>
- * <pre>
- * {@code
- * HttpRequest request = HttpRequest.newBuilder()
- * .uri(URI.create("https://www.foo.com/"))
- * .GET()
- * .build();
- *
- * HttpClient client = HttpClient.newHttpClient();
- *
- * Map<HttpRequest,CompletableFuture<HttpResponse<String>>> results = client
- * .sendAsync(request, MultiSubscriber.asMap(
- * (req) -> Optional.of(HttpResponse.BodyHandler.asString())))
- * .join();
- * }</pre>
- *
- * <p> The lambda in this example is the simplest possible implementation,
- * where neither the incoming requests are examined, nor the response
- * headers, and every push that the server sends is accepted. When the
- * join() call returns, all {@code HttpResponse}s and their associated
- * body objects are available.
- *
- * @param <V> the body type used for all responses
- * @param reqHandler a function invoked for each push promise and the
- * main request
- * @return a MultiSubscriber
- */
- public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
- Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler) {
-
- return asMap(reqHandler, true);
- }
-
- }
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Tue Jan 16 15:52:01 2018 +0000
@@ -36,6 +36,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
@@ -52,7 +53,7 @@
*
* Creates a new Exchange for each request/response interaction
*/
-class MultiExchange<U,T> {
+class MultiExchange<T> {
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
static final System.Logger DEBUG_LOGGER =
@@ -64,7 +65,6 @@
final HttpClientImpl client;
final HttpResponse.BodyHandler<T> responseHandler;
final Executor executor;
- final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
final AtomicInteger attempts = new AtomicInteger();
HttpRequestImpl currentreq; // used for async only
Exchange<T> exchange; // the current exchange
@@ -84,7 +84,7 @@
private final List<HeaderFilter> filters;
TimedEvent timedEvent;
volatile boolean cancelled;
- final PushGroup<U,T> pushGroup;
+ final PushGroup<T> pushGroup;
/**
* Filter fields. These are attached as required by filters
@@ -103,6 +103,7 @@
HttpRequestImpl requestImpl,
HttpClientImpl client,
HttpResponse.BodyHandler<T> responseHandler,
+ PushPromiseHandler<T> pushPromiseHandler,
AccessControlContext acc) {
this.previous = null;
this.userRequest = userRequest;
@@ -118,37 +119,16 @@
if (responseHandler instanceof UntrustedBodyHandler)
((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
}
- this.exchange = new Exchange<>(request, this);
- this.multiResponseSubscriber = null;
- this.pushGroup = null;
- }
- /**
- * MultiExchange with multiple responses (HTTP/2 server pushes).
- */
- MultiExchange(HttpRequest userRequest,
- HttpRequestImpl requestImpl,
- HttpClientImpl client,
- HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
- AccessControlContext acc) {
- this.previous = null;
- this.userRequest = userRequest;
- this.request = requestImpl;
- this.currentreq = request;
- this.client = client;
- this.filters = client.filterChain();
- this.acc = acc;
- this.executor = client.theExecutor();
- this.multiResponseSubscriber = multiResponseSubscriber;
- this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
+ if (pushPromiseHandler != null) {
+ this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc);
+ } else {
+ pushGroup = null;
+ }
+
this.exchange = new Exchange<>(request, this);
- this.responseHandler = pushGroup.mainResponseHandler();
}
-// CompletableFuture<Void> multiCompletionCF() {
-// return pushGroup.groupResult();
-// }
-
private synchronized Exchange<T> getExchange() {
return exchange;
}
@@ -157,10 +137,6 @@
return client;
}
-// HttpClient.Redirect followRedirects() {
-// return client.followRedirects();
-// }
-
HttpClient.Version version() {
return request.version().orElse(client.version());
}
@@ -233,21 +209,6 @@
});
}
- CompletableFuture<U> multiResponseAsync() {
- CompletableFuture<Void> start = new MinimalFuture<>();
- CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
- CompletableFuture<HttpResponse<T>> mainResponse =
- cf.thenApply(b -> {
- multiResponseSubscriber.onResponse(b);
- pushGroup.noMorePushes(true);
- return b; });
- pushGroup.setMainResponse(mainResponse);
- CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
- pushGroup.pushesCF());
- start.completeAsync( () -> null, executor); // trigger execution
- return res;
- }
-
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (attempts.incrementAndGet() > max_attempts) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiMapResult.java Fri Jan 12 15:36:28 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A {@link java.util.Map} containing the result of a HTTP/2 request and multi-response.
- * {@Incubating}
- * <p>
- * This is one possible implementation of the aggregate result type {@code <U>} returned
- * from {@link HttpClient#sendAsync(HttpRequest,HttpResponse.MultiSubscriber) }.
- * The map is indexed by {@link HttpRequest} and each value is a
- * {@link java.util.concurrent.CompletableFuture}<
- * {@link HttpResponse}{@code <V>}>
- * <p>
- * A {@code MultiMapResult} is obtained from an invocation such as the one shown below:
- * <p>
- * {@link CompletableFuture}<{@code MultiMapResult<V>}>
- * {@link HttpClient#sendAsync(HttpRequest,
- * HttpResponse.MultiSubscriber) HttpClient.sendAsync(}{@link
- * HttpResponse.MultiSubscriber#asMap(java.util.function.Function)
- * MultiSubscriber.asMap(Function)})
- *
- * @param <V> the response body type for all responses
- */
-public class MultiMapResult<V> implements Map<HttpRequest,CompletableFuture<HttpResponse<V>>> {
- private final Map<HttpRequest,CompletableFuture<HttpResponse<V>>> map;
-
- MultiMapResult(Map<HttpRequest,CompletableFuture<HttpResponse<V>>> map) {
- this.map = map;
- }
-
- @Override
- public int size() {
- return map.size();
- }
-
- @Override
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return map.containsKey(key);
- }
-
- @Override
- public boolean containsValue(Object value) {
- return map.containsValue(value);
- }
-
- @Override
- public CompletableFuture<HttpResponse<V>> get(Object key) {
- return map.get(key);
- }
-
- @Override
- public CompletableFuture<HttpResponse<V>> put(HttpRequest key, CompletableFuture<HttpResponse<V>> value) {
- return map.put(key, value);
- }
-
- @Override
- public CompletableFuture<HttpResponse<V>> remove(Object key) {
- return map.remove(key);
- }
-
- @Override
- public void putAll(Map<? extends HttpRequest, ? extends CompletableFuture<HttpResponse<V>>> m) {
- map.putAll(m);
- }
-
- @Override
- public void clear() {
- map.clear();
- }
-
- @Override
- public Set<HttpRequest> keySet() {
- return map.keySet();
- }
-
- @Override
- public Collection<CompletableFuture<HttpResponse<V>>> values() {
- return map.values();
- }
-
- @Override
- public Set<Entry<HttpRequest, CompletableFuture<HttpResponse<V>>>> entrySet() {
- return map.entrySet();
- }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Tue Jan 16 15:52:01 2018 +0000
@@ -63,7 +63,7 @@
HttpClientImpl client = client();
assert client != null;
HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
- MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null);
+ MultiExchange<Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null, null);
Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
return connectExchange
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PushGroup.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PushGroup.java Tue Jan 16 15:52:01 2018 +0000
@@ -26,9 +26,10 @@
package jdk.incubator.http;
import java.security.AccessControlContext;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Log;
@@ -37,20 +38,15 @@
* One PushGroup object is associated with the parent Stream of the pushed
* Streams. This keeps track of all common state associated with the pushes.
*/
-class PushGroup<U,T> {
- // the overall completion object, completed when all pushes are done.
- final CompletableFuture<Void> resultCF;
+class PushGroup<T> {
+ private final HttpRequest initiatingRequest;
+
final CompletableFuture<Void> noMorePushesCF;
volatile Throwable error; // any exception that occurred during pushes
- // CF for main response
- final CompletableFuture<HttpResponse<T>> mainResponse;
-
// user's subscriber object
- final HttpResponse.MultiSubscriber<U, T> multiSubscriber;
-
- final HttpResponse.BodyHandler<T> mainBodyHandler;
+ final PushPromiseHandler<T> pushPromiseHandler;
private final AccessControlContext acc;
@@ -58,84 +54,69 @@
int remainingPushes;
boolean noMorePushes = false;
- PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
- HttpRequestImpl req,
+ PushGroup(PushPromiseHandler<T> pushPromiseHandler,
+ HttpRequestImpl initiatingRequest,
AccessControlContext acc) {
- this(multiSubscriber, req, new MinimalFuture<>(), acc);
+ this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), acc);
}
// Check mainBodyHandler before calling nested constructor.
- private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
- HttpRequestImpl req,
+ private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,
+ HttpRequestImpl initiatingRequest,
CompletableFuture<HttpResponse<T>> mainResponse,
AccessControlContext acc) {
- this(multiSubscriber,
- mainResponse,
- multiSubscriber.onRequest(req),
- acc);
- }
-
- // This private constructor is called after all parameters have been checked.
- private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber,
- CompletableFuture<HttpResponse<T>> mainResponse,
- HttpResponse.BodyHandler<T> mainBodyHandler,
- AccessControlContext acc) {
-
- assert mainResponse != null; // A new instance is created above
- assert mainBodyHandler != null; // should have been checked above
-
- this.resultCF = new MinimalFuture<>();
this.noMorePushesCF = new MinimalFuture<>();
- this.multiSubscriber = multiSubscriber;
- this.mainResponse = mainResponse.thenApply(r -> {
- multiSubscriber.onResponse(r);
- return r;
- });
- this.mainBodyHandler = mainBodyHandler;
- if (acc != null) {
- // Restricts the file publisher with the senders ACC, if any
- if (mainBodyHandler instanceof UntrustedBodyHandler)
- ((UntrustedBodyHandler)this.mainBodyHandler).setAccessControlContext(acc);
- }
+ this.pushPromiseHandler = pushPromiseHandler;
+ this.initiatingRequest = initiatingRequest;
+ // Restricts the file publisher with the senders ACC, if any
+ if (pushPromiseHandler instanceof UntrustedBodyHandler)
+ ((UntrustedBodyHandler)this.pushPromiseHandler).setAccessControlContext(acc);
this.acc = acc;
}
- CompletableFuture<Void> groupResult() {
- return resultCF;
- }
+ static class Acceptor<T> {
+ final HttpRequest initiator, push;
+ volatile HttpResponse.BodyHandler<T> bodyHandler = null;
+ volatile CompletableFuture<HttpResponse<T>> cf;
+
+ Acceptor(HttpRequest initiator, HttpRequest push) {
+ this.initiator = initiator;
+ this.push = push;
+ }
- HttpResponse.MultiSubscriber<U, T> subscriber() {
- return multiSubscriber;
+ CompletableFuture<HttpResponse<T>> accept(HttpResponse.BodyHandler<T> bodyHandler) {
+ Objects.requireNonNull(bodyHandler);
+ cf = new MinimalFuture<>();
+ if (this.bodyHandler != null)
+ throw new IllegalStateException();
+ this.bodyHandler = bodyHandler;
+ return cf;
+ }
+
+ HttpResponse.BodyHandler<T> bodyHandler() {
+ return bodyHandler;
+ }
+
+ CompletableFuture<HttpResponse<T>> cf() {
+ return cf;
+ }
+
+ boolean accepted() {
+ return cf != null;
+ }
}
- Optional<BodyHandler<T>> handlerForPushRequest(HttpRequest ppRequest) {
- Optional<BodyHandler<T>> bh = multiSubscriber.onPushPromise(ppRequest);
- if (acc != null && bh.isPresent()) {
- // Restricts the file publisher with the senders ACC, if any
- BodyHandler<T> x = bh.get();
- if (x instanceof UntrustedBodyHandler)
- ((UntrustedBodyHandler)x).setAccessControlContext(acc);
- bh = Optional.of(x);
- }
- return bh;
- }
+ Acceptor<T> acceptPushRequest(HttpRequest pushRequest) {
+ Acceptor<T> acceptor = new Acceptor<>(initiatingRequest, pushRequest);
+
+ pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);
- HttpResponse.BodyHandler<T> mainResponseHandler() {
- return mainBodyHandler;
- }
+ if (acceptor.accepted()) {
+ numberOfPushes++;
+ remainingPushes++;
+ }
+ return acceptor;
- synchronized void setMainResponse(CompletableFuture<HttpResponse<T>> r) {
- r.whenComplete((HttpResponse<T> response, Throwable t) -> {
- if (t != null)
- mainResponse.completeExceptionally(t);
- else
- mainResponse.complete(response);
- });
- }
-
- synchronized void addPush() {
- numberOfPushes++;
- remainingPushes++;
}
// This is called when the main body response completes because it means
@@ -171,7 +152,6 @@
if (Log.trace()) {
Log.logTrace("push completed");
}
- resultCF.complete(null);
}
}
@@ -180,6 +160,5 @@
return;
}
this.error = t;
- resultCF.completeExceptionally(t);
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RedirectFilter.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RedirectFilter.java Tue Jan 16 15:52:01 2018 +0000
@@ -36,7 +36,7 @@
HttpClientImpl client;
HttpClient.Redirect policy;
String method;
- MultiExchange<?,?> exchange;
+ MultiExchange<?> exchange;
static final int DEFAULT_MAX_REDIRECTS = 5;
URI uri;
@@ -48,7 +48,7 @@
public RedirectFilter() {}
@Override
- public synchronized void request(HttpRequestImpl r, MultiExchange<?,?> e) throws IOException {
+ public synchronized void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
this.request = r;
this.client = e.client();
this.policy = client.followRedirects();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Jan 16 15:52:01 2018 +0000
@@ -518,59 +518,6 @@
}
}
- static class MultiSubscriberImpl<V>
- implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
- {
- private final MultiMapResult<V> results;
- private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
- private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler;
- private final boolean completion; // aggregate completes on last PP received or overall completion
-
- MultiSubscriberImpl(
- Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler,
- Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) {
- this.results = new MultiMapResult<>(new ConcurrentHashMap<>());
- this.requestHandler = requestHandler;
- this.pushHandler = pushHandler;
- this.completion = completion;
- }
-
- @Override
- public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) {
- CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
- results.put(request, cf);
- return requestHandler.apply(request);
- }
-
- @Override
- public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) {
- CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
- results.put(push, cf);
- return pushHandler.apply(push);
- }
-
- @Override
- public void onResponse(HttpResponse<V> response) {
- CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
- cf.complete(response);
- }
-
- @Override
- public void onError(HttpRequest request, Throwable t) {
- CompletableFuture<HttpResponse<V>> cf = results.get(request);
- cf.completeExceptionally(t);
- }
-
- @Override
- public CompletableFuture<MultiMapResult<V>> completion(
- CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) {
- if (completion)
- return onComplete.thenApply((ignored)-> results);
- else
- return onFinalPushPromise.thenApply((ignored) -> results);
- }
- }
-
/**
* Currently this consumes all of the data and ignores it
*/
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Jan 16 15:52:01 2018 +0000
@@ -224,7 +224,7 @@
BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
CompletableFuture<T> cf = receiveData(bodySubscriber);
- PushGroup<?,?> pg = exchange.getPushGroup();
+ PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(e));
@@ -420,42 +420,46 @@
}
}
- void incoming_pushPromise(HttpRequestImpl pushReq,
- PushedStream<?,T> pushStream)
+ void incoming_pushPromise(HttpRequestImpl pushRequest,
+ PushedStream<T> pushStream)
throws IOException
{
if (Log.requests()) {
- Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+ Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
}
- PushGroup<?,T> pushGroup = exchange.getPushGroup();
- if (pushGroup == null || pushGroup.noMorePushes()) {
+ PushGroup<T> pushGroup = exchange.getPushGroup();
+ if (pushGroup == null) {
+ // no push handler set by the user code, i.e. cancel / reject
+ IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
+ pushStream.cancelImpl(ex);
+ return;
+ }
+
+ if (pushGroup.noMorePushes()) {
cancelImpl(new IllegalStateException("unexpected push promise"
+ " on stream " + streamid));
return;
}
- HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
-
- CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-
- Optional<HttpResponse.BodyHandler<T>> bpOpt =
- pushGroup.handlerForPushRequest(pushReq);
+ PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
+ CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
- if (!bpOpt.isPresent()) {
- IOException ex = new IOException("Stream "
- + streamid + " cancelled by user");
+ if (!acceptor.accepted()) {
+ // cancel / reject
+ IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
if (Log.trace()) {
- Log.logTrace("No body subscriber for {0}: {1}", pushReq,
- ex.getMessage());
+ Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
+ ex.getMessage());
}
pushStream.cancelImpl(ex);
- cf.completeExceptionally(ex);
return;
}
- pushGroup.addPush();
+ CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
+ HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+
pushStream.requestSent();
- pushStream.setPushHandler(bpOpt.get());
+ pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
// setup housekeeping for when the push is received
// TODO: deal with ignoring of CF anti-pattern
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
@@ -467,9 +471,9 @@
}
if (t != null) {
pushGroup.pushError(t);
- proc.onError(pushReq, t);
+ pushResponseCF.completeExceptionally(t);
} else {
- proc.onResponse(resp);
+ pushResponseCF.complete(resp);
}
pushGroup.pushCompleted();
});
@@ -811,7 +815,7 @@
cf = cf.thenApplyAsync(r -> r, executor);
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
- PushGroup<?,?> pg = exchange.getPushGroup();
+ PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
@@ -950,16 +954,16 @@
Log.logTrace("Stream {0} closed", streamid);
}
- static class PushedStream<U,T> extends Stream<T> {
- final PushGroup<U,T> pushGroup;
+ static class PushedStream<T> extends Stream<T> {
+ final PushGroup<T> pushGroup;
// push streams need the response CF allocated up front as it is
// given directly to user via the multi handler callback function.
final CompletableFuture<Response> pushCF;
- final CompletableFuture<HttpResponse<T>> responseCF;
+ CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
- PushedStream(PushGroup<U,T> pushGroup,
+ PushedStream(PushGroup<T> pushGroup,
Http2Connection connection,
Exchange<T> pushReq) {
// ## no request body possible, null window controller
@@ -968,6 +972,7 @@
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
+
}
CompletableFuture<HttpResponse<T>> responseCF() {
--- a/test/jdk/java/net/httpclient/http2/ServerPush.java Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/ServerPush.java Tue Jan 16 15:52:01 2018 +0000
@@ -30,89 +30,222 @@
* jdk.incubator.httpclient/jdk.incubator.http.internal.common
* jdk.incubator.httpclient/jdk.incubator.http.internal.frame
* jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
- * @run testng/othervm -Djdk.internal.httpclient.hpack.debug=true -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPush
+ * @run testng/othervm -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPush
*/
import java.io.*;
import java.net.*;
+import java.nio.ByteBuffer;
import java.nio.file.*;
-import java.nio.file.attribute.*;
import jdk.incubator.http.*;
-import jdk.incubator.http.HttpResponse.MultiSubscriber;
import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
import java.util.*;
import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.*;
+
public class ServerPush {
- static ExecutorService e = Executors.newCachedThreadPool();
-
static final int LOOPS = 13;
static final int FILE_SIZE = 512 * 1024 + 343;
static Path tempFile;
- @Test
- public static void test() throws Exception {
- Http2TestServer server = null;
- final Path dir = Files.createTempDirectory("serverPush");
- try {
- server = new Http2TestServer(false, 0);
- server.addHandler(new PushHandler(FILE_SIZE, LOOPS), "/");
- tempFile = TestUtil.getAFile(FILE_SIZE);
+ Http2TestServer server;
+ URI uri;
- System.err.println("Server listening on port " + server.getAddress().getPort());
- server.start();
- int port = server.getAddress().getPort();
+ @BeforeTest
+ public void setup() throws Exception {
+ server = new Http2TestServer(false, 0);
+ server.addHandler(new PushHandler(FILE_SIZE, LOOPS), "/");
+ tempFile = TestUtil.getAFile(FILE_SIZE);
+ System.out.println("Using temp file:" + tempFile);
- // use multi-level path
- URI uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
- HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+ System.err.println("Server listening on port " + server.getAddress().getPort());
+ server.start();
+ int port = server.getAddress().getPort();
+ uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
+ }
- CompletableFuture<MultiMapResult<Path>> cf =
- HttpClient.newBuilder().version(HttpClient.Version.HTTP_2)
- .executor(e).build().sendAsync(
- request, MultiSubscriber.asMap((req) -> {
- URI u = req.uri();
- Path path = Paths.get(dir.toString(), u.getPath());
- try {
- Files.createDirectories(path.getParent());
- } catch (IOException ee) {
- throw new UncheckedIOException(ee);
- }
- return Optional.of(BodyHandler.asFile(path));
- }
- ));
- MultiMapResult<Path> results = cf.get();
-
- //HttpResponse resp = request.response();
- System.err.println(results.size());
- Set<HttpRequest> requests = results.keySet();
+ @AfterTest
+ public void teardown() {
+ server.stop();
+ }
- for (HttpRequest r : requests) {
- URI u = r.uri();
- Path result = results.get(r).get().body();
- System.err.printf("%s -> %s\n", u.toString(), result.toString());
- TestUtil.compareFiles(result, tempFile);
- }
- if (requests.size() != LOOPS + 1)
- throw new RuntimeException("some results missing");
- System.out.println("TEST OK: sleeping for 5 sec");
- Thread.sleep (5 * 1000);
- } finally {
- e.shutdownNow();
- server.stop();
- Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
- dir.toFile().delete();
- return FileVisitResult.CONTINUE;
- }
- public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
- path.toFile().delete();
- return FileVisitResult.CONTINUE;
- }
- });
+ static final UnaryOperator<HttpResponse<?>>
+ assert200ResponseCode = (response) -> {
+ assertEquals(response.statusCode(), 200);
+ return response;
+ };
+
+ interface Peeker<T> extends UnaryOperator<T> {
+ void peek(T t);
+
+ default T apply(T t)
+ {
+ peek(t);
+ return t;
}
}
+
+ @Test
+ public void testTypeString() throws Exception {
+ // use multi-level path
+ HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+ String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+
+ // Attempt 2
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+
+ ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<String>>> results = new ConcurrentHashMap<>();
+
+
+ // Example 2 - of(...) building your own Map, everything as a String
+ PushPromiseHandler<String> pph = (initial, pushRequest, acceptor) -> {
+ BodyHandler<String> s = BodyHandler.asString(UTF_8);
+ CompletableFuture<HttpResponse<String>> cf = acceptor.apply(s);
+ results.put(pushRequest, cf);
+ };
+
+ CompletableFuture<HttpResponse<String>> cf =
+ client.sendAsync(request, BodyHandler.asString(), pph);
+ cf.join();
+ results.put(request, cf);
+
+ System.err.println(results.size());
+ Set<HttpRequest> requests = results.keySet();
+
+ System.err.println("results.size: " + results.size());
+ for (HttpRequest r : requests) {
+ String result = results.get(r).get().body();
+ if (!result.equals(tempFileAsString)) {
+ System.err.println("Got [" + result + ", expected [" + tempFileAsString + "]");
+ }
+ }
+ if (requests.size() != LOOPS + 1)
+ throw new RuntimeException("Some results missing, expected:" + LOOPS + 1 + ", got:" + results.size());
+ }
+
+ // --- Path ---
+
+ static final Path dir = Paths.get(".", "serverPush");
+ static BodyHandler<Path> requestToPath(HttpRequest req) {
+ URI u = req.uri();
+ Path path = Paths.get(dir.toString(), u.getPath());
+ try {
+ Files.createDirectories(path.getParent());
+ } catch (IOException ee) {
+ throw new UncheckedIOException(ee);
+ }
+ return BodyHandler.asFile(path);
+ };
+
+ @Test
+ public void testTypePath() throws Exception {
+ HttpClient client = HttpClient.newHttpClient();
+ HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+ ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<Path>>> map
+ = new ConcurrentHashMap<>();
+
+ // Example 4 - of(...) building your own Map, everything as a Path
+ PushPromiseHandler<Path> pushPromiseHandler = (initial, pushRequest, acceptor) -> {
+ BodyHandler<Path> pp = requestToPath(pushRequest);
+ CompletableFuture<HttpResponse<Path>> cf = acceptor.apply(pp);
+ map.put(pushRequest, cf);
+ };
+
+ CompletableFuture<HttpResponse<Path>> cf =
+ client.sendAsync(request, requestToPath(request), pushPromiseHandler);
+ cf.join();
+ map.put(request, cf);
+
+ System.err.println("map.size: " + map.size());
+ for (HttpRequest r : map.keySet()) {
+ Path path = map.get(r).get().body();
+ String fileAsString = new String(Files.readAllBytes(path), UTF_8);
+ String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+ assertEquals(fileAsString, tempFileAsString);
+ }
+ assertEquals(map.size(), LOOPS + 1);
+ }
+
+ // --- Consumer<byte[]> ---
+
+ static class ByteArrayConsumer implements Consumer<Optional<byte[]>> {
+ volatile List<byte[]> listByteArrays = new ArrayList<>();
+ volatile byte[] accumulatedBytes;
+
+ public byte[] getAccumulatedBytes() { return accumulatedBytes; }
+
+ @Override
+ public void accept(Optional<byte[]> optionalBytes) {
+ assert accumulatedBytes == null;
+ if (!optionalBytes.isPresent()) {
+ int size = listByteArrays.stream().mapToInt(ba -> ba.length).sum();
+ ByteBuffer bb = ByteBuffer.allocate(size);
+ listByteArrays.stream().forEach(ba -> bb.put(ba));
+ accumulatedBytes = bb.array();
+ } else {
+ listByteArrays.add(optionalBytes.get());
+ }
+ }
+ }
+
+ @Test
+ public void testTypeByteArrayConsumer() throws Exception {
+ HttpClient client = HttpClient.newHttpClient();
+ HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+ ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<Void>>> resultMap
+ = new ConcurrentHashMap<>();
+ Map<HttpRequest,ByteArrayConsumer> byteArrayConsumerMap
+ = new ConcurrentHashMap<>();
+
+ ByteArrayConsumer bac = new ByteArrayConsumer();
+ byteArrayConsumerMap.put(request, bac);
+
+ // Example 5 - withXXX and everything as a consumer of optional byte[]
+ PushPromiseHandler<Void> pushPromiseHandler =
+ PushPromiseHandler.withPushPromises(pushRequest -> {
+ ByteArrayConsumer bc = new ByteArrayConsumer();
+ byteArrayConsumerMap.put(pushRequest, bc);
+ return BodyHandler.asByteArrayConsumer(bc);
+ },
+ resultMap);
+
+ CompletableFuture<HttpResponse<Void>> cf =
+ client.sendAsync(request, BodyHandler.asByteArrayConsumer(bac), pushPromiseHandler);
+ cf.join();
+ resultMap.put(request, cf);
+
+ System.err.println("map.size: " + resultMap.size());
+ for (HttpRequest r : resultMap.keySet()) {
+ resultMap.get(r).join();
+ byte[] ba = byteArrayConsumerMap.get(r).getAccumulatedBytes();
+ String result = new String(ba, UTF_8);
+ System.out.println("HEGO result=" + result);
+ System.out.println("HEGO result.length=" + result.length());
+ System.err.printf("%s -> %s\n", r.uri().toString(), result);
+ String tempFileAsString = new String(Files.readAllBytes(tempFile), UTF_8);
+ System.out.println("HEGO tempFileAsString=" + tempFileAsString);
+ System.out.println("HEGO tempFileAsString.length=" + tempFileAsString.length());
+ assertEquals(result, tempFileAsString);
+ }
+
+ assertEquals(resultMap.size(), LOOPS + 1);
+ }
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/http2/ServerPushWithDiffTypes.java Tue Jan 16 15:52:01 2018 +0000
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @library /lib/testlibrary server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true -Djdk.httpclient.HttpClient.log=errors,requests,responses ServerPushWithDiffTypes
+ */
+
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.*;
+import jdk.incubator.http.*;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.PushPromiseHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import java.util.*;
+import java.util.concurrent.*;
+import jdk.incubator.http.internal.common.HttpHeadersImpl;
+import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class ServerPushWithDiffTypes {
+
+ static Map<String,String> PUSH_PROMISES = Map.of(
+ "/x/y/z/1", "the first push promise body",
+ "/x/y/z/2", "the second push promise body",
+ "/x/y/z/3", "the third push promise body",
+ "/x/y/z/4", "the fourth push promise body",
+ "/x/y/z/5", "the fifth push promise body",
+ "/x/y/z/6", "the sixth push promise body",
+ "/x/y/z/7", "the seventh push promise body",
+ "/x/y/z/8", "the eight push promise body",
+ "/x/y/z/9", "the ninth push promise body"
+ );
+
+ @Test
+ public static void test() throws Exception {
+ Http2TestServer server = null;
+ try {
+ server = new Http2TestServer(false, 0);
+ Http2Handler handler = new ServerPushHandler("the main response body",
+ PUSH_PROMISES);
+ server.addHandler(handler, "/");
+ server.start();
+ int port = server.getAddress().getPort();
+ System.err.println("Server listening on port " + port);
+
+ HttpClient client = HttpClient.newHttpClient();
+ // use multi-level path
+ URI uri = new URI("http://127.0.0.1:" + port + "/foo/a/b/c");
+ HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
+
+ ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<BodyAndType<?>>>> results = new ConcurrentHashMap<>();
+ PushPromiseHandler<BodyAndType<?>> bh = PushPromiseHandler.withPushPromises(
+ (pushRequest) -> new BodyAndTypeHandler(pushRequest), results);
+
+ CompletableFuture<HttpResponse<BodyAndType<?>>> cf = client.sendAsync(request, new BodyAndTypeHandler(request), bh);
+ results.put(request, cf);
+ cf.join();
+ System.err.println("CHEGAR: results.size: " + results.size());
+
+ if (results.size() != PUSH_PROMISES.size() + 1)
+ throw new RuntimeException("Some results missing, expected:"
+ + (PUSH_PROMISES.size() + 1) + ", got:" + results.size());
+
+ for (HttpRequest r : results.keySet()) {
+ URI u = r.uri();
+ BodyAndType<?> body = results.get(r).get().body();
+ String result;
+ // convert all body types to String for easier comparison
+ if (body.type() == String.class) {
+ result = (String)body.getBody();
+ } else if (body.type() == byte[].class) {
+ byte[] bytes = (byte[])body.getBody();
+ result = new String(bytes, UTF_8);
+ } else if (Path.class.isAssignableFrom(body.type())) {
+ Path path = (Path)body.getBody();
+ result = new String(Files.readAllBytes(path), UTF_8);
+ } else {
+ throw new AssertionError("Unknown:" + body.type());
+ }
+
+ System.err.printf("%s -> %s\n", u.toString(), result.toString());
+ String expected = PUSH_PROMISES.get(r.uri().getPath());
+ if (expected == null)
+ expected = "the main response body";
+ System.err.println("For " + r + ", got [" + result + "], expected [" + expected +"]");
+ if (!result.equals(expected)) {
+ throw new RuntimeException("For " + r + ", got [" + result + "], expected [" + expected +"]");
+ }
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ static interface BodyAndType<T> {
+ Class<T> type();
+ T getBody();
+ }
+
+ static final Path WORK_DIR = Paths.get(".");
+
+ static class BodyAndTypeHandler implements BodyHandler<BodyAndType<?>> {
+ int count;
+ final HttpRequest request;
+
+ BodyAndTypeHandler(HttpRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public HttpResponse.BodySubscriber<BodyAndType<?>> apply(int statusCode,
+ HttpHeaders responseHeaders) {
+ int whichType = count++ % 3; // real world may base this on the request metadata
+ switch (whichType) {
+ case 0: // String
+ return new BodyAndTypeSubscriber(BodySubscriber.asString(StandardCharsets.UTF_8));
+ case 1: // byte[]
+ return new BodyAndTypeSubscriber(BodySubscriber.asByteArray());
+ case 2: // Path
+ URI u = request.uri();
+ Path path = Paths.get(WORK_DIR.toString(), u.getPath());
+ try {
+ Files.createDirectories(path.getParent());
+ } catch (IOException ee) {
+ throw new UncheckedIOException(ee);
+ }
+ return new BodyAndTypeSubscriber(BodySubscriber.asFile(path));
+ default:
+ throw new AssertionError("Unexpected " + whichType);
+ }
+ }
+ }
+
+ static class BodyAndTypeSubscriber<T> implements HttpResponse.BodySubscriber<BodyAndType<T>> {
+
+ private static class BodyAndTypeImpl<T> implements BodyAndType<T> {
+ private final Class<T> type;
+ private final T body;
+ public BodyAndTypeImpl(Class<T> type, T body) { this.type = type; this.body = body; }
+ @Override public Class<T> type() { return type; }
+ @Override public T getBody() { return body; }
+ }
+
+ private final BodySubscriber<?> bodySubscriber;
+ private final CompletableFuture<BodyAndType<T>> cf;
+
+ BodyAndTypeSubscriber(BodySubscriber bodySubscriber) {
+ this.bodySubscriber = bodySubscriber;
+ cf = new CompletableFuture<>();
+ bodySubscriber.getBody().whenComplete((r,t) -> cf.complete(new BodyAndTypeImpl(r.getClass(), r)));
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ bodySubscriber.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ bodySubscriber.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ bodySubscriber.onError(throwable);
+ cf.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ bodySubscriber.onComplete();
+ }
+
+ @Override
+ public CompletionStage<BodyAndType<T>> getBody() {
+ return cf;
+ }
+ }
+
+ // --- server push handler ---
+ static class ServerPushHandler implements Http2Handler {
+
+ private final String mainResponseBody;
+ private final Map<String,String> promises;
+
+ public ServerPushHandler(String mainResponseBody, Map<String,String> promises) throws Exception {
+ Objects.requireNonNull(promises);
+ this.mainResponseBody = mainResponseBody;
+ this.promises = promises;
+ }
+
+ public void handle(Http2TestExchange exchange) throws IOException {
+ System.err.println("Server: handle " + exchange);
+ try (InputStream is = exchange.getRequestBody()) {
+ is.readAllBytes();
+ }
+
+ if (exchange.serverPushAllowed()) {
+ pushPromises(exchange);
+ }
+
+ // response data for the main response
+ try (OutputStream os = exchange.getResponseBody()) {
+ byte[] bytes = mainResponseBody.getBytes(UTF_8);
+ exchange.sendResponseHeaders(200, bytes.length);
+ os.write(bytes);
+ }
+ }
+
+ private void pushPromises(Http2TestExchange exchange) throws IOException {
+ URI requestURI = exchange.getRequestURI();
+ for (Map.Entry<String,String> promise : promises.entrySet()) {
+ URI uri = requestURI.resolve(promise.getKey());
+ InputStream is = new ByteArrayInputStream(promise.getValue().getBytes(UTF_8));
+ HttpHeadersImpl headers = new HttpHeadersImpl();
+ headers.addHeader("X-Promise-"+promise.getKey(), promise.getKey()); // todo: add some check on headers, maybe
+ exchange.serverPush(uri, headers, is);
+ }
+ System.err.println("Server: All pushes sent");
+ }
+ }
+}
--- a/test/jdk/java/net/httpclient/http2/server/PushHandler.java Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/PushHandler.java Tue Jan 16 15:52:01 2018 +0000
@@ -46,9 +46,10 @@
invocation++;
if (ee.serverPushAllowed()) {
+ URI requestURI = ee.getRequestURI();
for (int i=0; i<loops; i++) {
InputStream is = new FileInputStream(tempFile.toFile());
- URI u = new URI ("http://www.foo.com/x/y/z/" + Integer.toString(i));
+ URI u = requestURI.resolve("/x/y/z/" + Integer.toString(i));
HttpHeadersImpl h = new HttpHeadersImpl();
h.addHeader("X-foo", "bar");
ee.serverPush(u, h, is);
--- a/test/jdk/java/net/httpclient/http2/server/TestUtil.java Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/TestUtil.java Tue Jan 16 15:52:01 2018 +0000
@@ -45,7 +45,6 @@
public static Path tempFile() {
try {
Path p = Files.createTempFile("foo", "test");
- p.toFile().deleteOnExit();
return p;
} catch (IOException e) {
throw new UncheckedIOException(e);
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java Fri Jan 12 15:36:28 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java Tue Jan 16 15:52:01 2018 +0000
@@ -239,8 +239,9 @@
return error();
}
@Override
- public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
- HttpResponse.MultiSubscriber<U, T> multiSubscriber) {
+ public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
+ HttpResponse.BodyHandler<T> bodyHandler,
+ HttpResponse.PushPromiseHandler<T> multiHandler) {
return error();
}
}