http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part II: HTTP/2)
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Fri Apr 06 22:11:44 2018 +0100
@@ -332,7 +332,7 @@
if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
Http1AsyncDelegate delegate = this.delegate;
if (delegate != null) unsubscribe(delegate);
- Consumer<Throwable> onIllegalArg = (x) -> {
+ Consumer<Throwable> onSubscriptionError = (x) -> {
setRetryOnError(false);
stopRequested = true;
onReadError(x);
@@ -356,7 +356,7 @@
// the header/body parser work with a flow of ByteBuffer, whereas
// we have a flow List<ByteBuffer> upstream.
Http1AsyncDelegateSubscription subscription =
- new Http1AsyncDelegateSubscription(scheduler, cancel, onIllegalArg);
+ new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
pending.onSubscribe(subscription);
this.delegate = delegate = pending;
final Object captured = delegate;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Fri Apr 06 22:11:44 2018 +0100
@@ -27,7 +27,6 @@
import java.io.EOFException;
import java.lang.System.Logger.Level;
-import java.net.http.HttpClient;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -36,7 +35,6 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.net.http.HttpHeaders;
@@ -45,7 +43,6 @@
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
-import java.lang.ref.Reference;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpResponse.BodySubscribers.discarding;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri Apr 06 22:11:44 2018 +0100
@@ -101,7 +101,8 @@
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
- final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
+ final SubscriptionBase userSubscription =
+ new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -125,6 +126,8 @@
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
volatile CompletableFuture<T> responseBodyCF;
+ volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
+ volatile boolean stopRequested;
/** True if END_STREAM has been seen in a frame received on this stream. */
private volatile boolean remotelyClosed;
@@ -152,12 +155,19 @@
* of after user subscription window has re-opened, from SubscriptionBase.request()
*/
private void schedule() {
- if (responseSubscriber == null)
- // can't process anything yet
- return;
-
boolean onCompleteCalled = false;
+ HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
try {
+ if (subscriber == null) {
+ subscriber = responseSubscriber = pendingResponseSubscriber;
+ if (subscriber == null) {
+ // can't process anything yet
+ return;
+ } else {
+ debug.log(Level.DEBUG, "subscribing user subscriber");
+ subscriber.onSubscribe(userSubscription);
+ }
+ }
while (!inputQ.isEmpty()) {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
@@ -176,7 +186,7 @@
Log.logTrace("responseSubscriber.onComplete");
debug.log(Level.DEBUG, "incoming: onComplete");
sched.stop();
- responseSubscriber.onComplete();
+ subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
@@ -184,17 +194,18 @@
inputQ.remove();
Log.logTrace("responseSubscriber.onNext {0}", size);
debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
- responseSubscriber.onNext(dsts);
+ subscriber.onNext(dsts);
if (consumed(df)) {
Log.logTrace("responseSubscriber.onComplete");
debug.log(Level.DEBUG, "incoming: onComplete");
sched.stop();
- responseSubscriber.onComplete();
+ subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
}
} else {
+ if (stopRequested) break;
return;
}
}
@@ -207,7 +218,10 @@
sched.stop();
try {
if (!onCompleteCalled) {
- responseSubscriber.onError(t);
+ debug.log(Level.DEBUG, "calling subscriber.onError: %s", (Object)t);
+ subscriber.onError(t);
+ } else {
+ debug.log(Level.DEBUG, "already completed: dropping error %s", (Object)t);
}
} catch (Throwable x) {
Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
@@ -301,10 +315,7 @@
Throwable t = getCancelCause();
responseBodyCF.completeExceptionally(t);
} else {
- bodySubscriber.onSubscribe(userSubscription);
- // Set the responseSubscriber field now that onSubscribe has been called.
- // This effectively allows the scheduler to start invoking the callbacks.
- responseSubscriber = bodySubscriber;
+ pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed
}
return responseBodyCF;
@@ -988,6 +999,19 @@
cancel(new IOException("Stream " + streamid + " cancelled"));
}
+ void onSubscriptionError(Throwable t) {
+ errorRef.compareAndSet(null, t);
+ debug.log(Level.DEBUG, "Got subscription error: %s", (Object)t);
+ // This is the special case where the subscriber
+ // has requested an illegal number of items.
+ // In this case, the error doesn't come from
+ // upstream, but from downstream, and we need to
+ // handle the error without waiting for the inputQ
+ // to be exhausted.
+ stopRequested = true;
+ sched.runOrSchedule();
+ }
+
@Override
void cancel(IOException cause) {
cancelImpl(cause);
@@ -1009,7 +1033,7 @@
}
}
if (closing) { // true if the stream has not been closed yet
- if (responseSubscriber != null)
+ if (responseSubscriber != null || pendingResponseSubscriber != null)
sched.runOrSchedule();
}
completeResponseExceptionally(e);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriptionBase.java Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriptionBase.java Fri Apr 06 22:11:44 2018 +0100
@@ -27,6 +27,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
/**
* Maintains subscription counter and provides primitives for:
@@ -41,17 +42,33 @@
final SequentialScheduler scheduler; // when window was zero and is opened, run this
final Runnable cancelAction; // when subscription cancelled, run this
final AtomicBoolean cancelled;
+ final Consumer<Throwable> onError;
public SubscriptionBase(SequentialScheduler scheduler, Runnable cancelAction) {
+ this(scheduler, cancelAction, null);
+ }
+
+ public SubscriptionBase(SequentialScheduler scheduler,
+ Runnable cancelAction,
+ Consumer<Throwable> onError) {
this.scheduler = scheduler;
this.cancelAction = cancelAction;
this.cancelled = new AtomicBoolean(false);
+ this.onError = onError;
}
@Override
public void request(long n) {
- if (demand.increase(n))
- scheduler.runOrSchedule();
+ try {
+ if (demand.increase(n))
+ scheduler.runOrSchedule();
+ } catch(Throwable t) {
+ if (onError != null) {
+ if (cancelled.getAndSet(true))
+ return;
+ onError.accept(t);
+ } else throw t;
+ }
}
@Override
--- a/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java Fri Apr 06 22:11:44 2018 +0100
@@ -184,19 +184,19 @@
{ httpURI_chunk, false, OF_INPUTSTREAM },
{ httpsURI_fixed, false, OF_INPUTSTREAM },
{ httpsURI_chunk, false, OF_INPUTSTREAM },
-// { http2URI_fixed, false, OF_INPUTSTREAM },
-// { http2URI_chunk, false, OF_INPUTSTREAM },
-// { https2URI_fixed, false, OF_INPUTSTREAM },
-// { https2URI_chunk, false, OF_INPUTSTREAM },
+ { http2URI_fixed, false, OF_INPUTSTREAM },
+ { http2URI_chunk, false, OF_INPUTSTREAM },
+ { https2URI_fixed, false, OF_INPUTSTREAM },
+ { https2URI_chunk, false, OF_INPUTSTREAM },
{ httpURI_fixed, true, OF_INPUTSTREAM },
{ httpURI_chunk, true, OF_INPUTSTREAM },
{ httpsURI_fixed, true, OF_INPUTSTREAM },
{ httpsURI_chunk, true, OF_INPUTSTREAM },
-// { http2URI_fixed, true, OF_INPUTSTREAM },
-// { http2URI_chunk, true, OF_INPUTSTREAM },
-// { https2URI_fixed, true, OF_INPUTSTREAM },
-// { https2URI_chunk, true, OF_INPUTSTREAM },
+ { http2URI_fixed, true, OF_INPUTSTREAM },
+ { http2URI_chunk, true, OF_INPUTSTREAM },
+ { https2URI_fixed, true, OF_INPUTSTREAM },
+ { https2URI_chunk, true, OF_INPUTSTREAM },
};
}
@@ -226,7 +226,8 @@
try (InputStream is = response.body()) {
String body = new String(is.readAllBytes(), UTF_8);
assertEquals(body, "");
- if (uri.endsWith("/chunk")) {
+ if (uri.endsWith("/chunk")
+ && response.version() == HttpClient.Version.HTTP_1_1) {
// with /fixed and 0 length
// there's no need for any call to request()
throw new RuntimeException("Expected IAE not thrown");
@@ -261,8 +262,9 @@
BodyHandler<InputStream> handler = handlers.get();
BodyHandler<InputStream> badHandler = (c,h) ->
new BadBodySubscriber<>(handler.apply(c,h));
- CompletableFuture<String> result =
- client.sendAsync(req, badHandler).thenCompose(
+ CompletableFuture<HttpResponse<InputStream>> response =
+ client.sendAsync(req, badHandler);
+ CompletableFuture<String> result = response.thenCompose(
(responsePublisher) -> {
try (InputStream is = responsePublisher.body()) {
return CompletableFuture.completedFuture(
@@ -274,7 +276,8 @@
try {
// Get the final result and compare it with the expected body
assertEquals(result.get(), "");
- if (uri.endsWith("/chunk")) {
+ if (uri.endsWith("/chunk")
+ && response.get().version() == HttpClient.Version.HTTP_1_1) {
// with /fixed and 0 length
// there's no need for any call to request()
throw new RuntimeException("Expected IAE not thrown");
--- a/test/jdk/java/net/httpclient/ThrowingPushPromises.java Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/ThrowingPushPromises.java Fri Apr 06 22:11:44 2018 +0100
@@ -372,7 +372,11 @@
}
final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
- throw new RuntimeException("Expected exception not thrown in " + w);
+ String msg = "Expected exception not thrown in " + w
+ + "\n\tReceived: " + resp
+ + "\n\tWith body: " + resp.body();
+ System.out.println(msg);
+ throw new RuntimeException(msg);
}
final List<String> checkAsString(Where w, URI reqURI,
@@ -442,7 +446,6 @@
case BEFORE_ACCEPTING:
case AFTER_ACCEPTING:
case BODY_HANDLER:
- case ON_SUBSCRIBE:
case GET_BODY:
case BODY_CF:
return shouldHaveThrown(w, presp, thrower);
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Fri Apr 06 22:11:44 2018 +0100
@@ -453,7 +453,6 @@
final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
switch(w) {
case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
- case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
case GET_BODY: return shouldHaveThrown(w, resp, thrower);
case BODY_CF: return shouldHaveThrown(w, resp, thrower);
default: break;
@@ -469,7 +468,7 @@
}
throw causeNotFound(w, x);
}
- throw new RuntimeException("Expected exception not thrown in " + w);
+ return shouldHaveThrown(w, resp, thrower);
}
final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
@@ -478,7 +477,6 @@
{
switch(w) {
case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
- case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
case GET_BODY: return shouldHaveThrown(w, resp, thrower);
case BODY_CF: return shouldHaveThrown(w, resp, thrower);
default: break;