--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Jan 12 15:36:28 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Jan 16 15:52:01 2018 +0000
@@ -224,7 +224,7 @@
BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
CompletableFuture<T> cf = receiveData(bodySubscriber);
- PushGroup<?,?> pg = exchange.getPushGroup();
+ PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(e));
@@ -420,42 +420,46 @@
}
}
- void incoming_pushPromise(HttpRequestImpl pushReq,
- PushedStream<?,T> pushStream)
+ void incoming_pushPromise(HttpRequestImpl pushRequest,
+ PushedStream<T> pushStream)
throws IOException
{
if (Log.requests()) {
- Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+ Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
}
- PushGroup<?,T> pushGroup = exchange.getPushGroup();
- if (pushGroup == null || pushGroup.noMorePushes()) {
+ PushGroup<T> pushGroup = exchange.getPushGroup();
+ if (pushGroup == null) {
+ // no push handler set by the user code, i.e. cancel / reject
+ IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
+ pushStream.cancelImpl(ex);
+ return;
+ }
+
+ if (pushGroup.noMorePushes()) {
cancelImpl(new IllegalStateException("unexpected push promise"
+ " on stream " + streamid));
return;
}
- HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
-
- CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-
- Optional<HttpResponse.BodyHandler<T>> bpOpt =
- pushGroup.handlerForPushRequest(pushReq);
+ PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
+ CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
- if (!bpOpt.isPresent()) {
- IOException ex = new IOException("Stream "
- + streamid + " cancelled by user");
+ if (!acceptor.accepted()) {
+ // cancel / reject
+ IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
if (Log.trace()) {
- Log.logTrace("No body subscriber for {0}: {1}", pushReq,
- ex.getMessage());
+ Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
+ ex.getMessage());
}
pushStream.cancelImpl(ex);
- cf.completeExceptionally(ex);
return;
}
- pushGroup.addPush();
+ CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
+ HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+
pushStream.requestSent();
- pushStream.setPushHandler(bpOpt.get());
+ pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
// setup housekeeping for when the push is received
// TODO: deal with ignoring of CF anti-pattern
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
@@ -467,9 +471,9 @@
}
if (t != null) {
pushGroup.pushError(t);
- proc.onError(pushReq, t);
+ pushResponseCF.completeExceptionally(t);
} else {
- proc.onResponse(resp);
+ pushResponseCF.complete(resp);
}
pushGroup.pushCompleted();
});
@@ -811,7 +815,7 @@
cf = cf.thenApplyAsync(r -> r, executor);
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
- PushGroup<?,?> pg = exchange.getPushGroup();
+ PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
@@ -950,16 +954,16 @@
Log.logTrace("Stream {0} closed", streamid);
}
- static class PushedStream<U,T> extends Stream<T> {
- final PushGroup<U,T> pushGroup;
+ static class PushedStream<T> extends Stream<T> {
+ final PushGroup<T> pushGroup;
// push streams need the response CF allocated up front as it is
// given directly to user via the multi handler callback function.
final CompletableFuture<Response> pushCF;
- final CompletableFuture<HttpResponse<T>> responseCF;
+ CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
- PushedStream(PushGroup<U,T> pushGroup,
+ PushedStream(PushGroup<T> pushGroup,
Http2Connection connection,
Exchange<T> pushReq) {
// ## no request body possible, null window controller
@@ -968,6 +972,7 @@
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
+
}
CompletableFuture<HttpResponse<T>> responseCF() {