http-client-branch: further review comments on error handling in BufferedSubscriber
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java Thu Nov 23 10:21:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java Thu Nov 23 12:50:34 2017 +0000
@@ -62,6 +62,9 @@
/** The actual accumulated remaining bytes in internalBuffers. */
private int accumulatedBytes;
+ /** Holds the Throwable from upstream's onError. */
+ private volatile Throwable throwable;
+
/** State of the buffering subscriber:
* 1) [UNSUBSCRIBED] when initially created
* 2) [ACTIVE] when subscribed and can receive data
@@ -152,16 +155,18 @@
private class DownstreamSubscription implements Flow.Subscription {
private final AtomicBoolean cancelled = new AtomicBoolean(); // false
private final Demand demand = new Demand();
+ private volatile boolean illegalArg;
@Override
public void request(long n) {
- if (cancelled.get()) {
+ if (cancelled.get() || illegalArg) {
return;
}
if (n <= 0L) {
- cancel();
- onError(new IllegalArgumentException(
- "non-positive subscription request"));
+ // pass the "bad" value upstream so the Publisher can deal with
+ // it appropriately, i.e. invoke onError
+ illegalArg = true;
+ subscription.request(n);
return;
}
@@ -183,6 +188,13 @@
@Override
public void run() {
try {
+ Throwable t = throwable;
+ if (t != null) {
+ downstreamSubscriber.onError(t);
+ pushDemandedScheduler.stop(); // stop the demand scheduler
+ return;
+ }
+
while (true) {
List<ByteBuffer> item;
synchronized (buffersLock) {
@@ -208,10 +220,11 @@
}
if (complete) {
downstreamSubscriber.onComplete();
+ pushDemandedScheduler.stop(); // stop the demand scheduler
return;
}
} catch (Throwable t) {
- cancel(); // cancel if there is any find of error
+ cancel(); // cancel if there is any error
throw t;
}
@@ -273,12 +286,15 @@
}
@Override
- public void onError(Throwable throwable) {
- Objects.requireNonNull(throwable);
+ public void onError(Throwable incomingThrowable) {
+ Objects.requireNonNull(incomingThrowable);
int s = state;
- assert s == ACTIVE || s == CANCELLED: "Expected ACTIVE||CANCELLED, got:" + s;
+ assert s == ACTIVE : "Expected ACTIVE, got:" + s;
state = ERROR;
- downstreamSubscriber.onError(throwable);
+ Throwable t = this.throwable;
+ assert t == null : "Expected null, got:" + t;
+ this.throwable = incomingThrowable;
+ downstreamSubscription.pushDemanded();
}
@Override
--- a/test/jdk/java/net/httpclient/BufferingSubscriberCancelTest.java Thu Nov 23 10:21:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberCancelTest.java Thu Nov 23 12:50:34 2017 +0000
@@ -82,6 +82,7 @@
s.request(MAX_VALUE); s.request(MAX_VALUE); s.request(MAX_VALUE);
s.request(-1); s.request(-100); s.request(MIN_VALUE);
assertEqualsWithRetry(publisher::getNumberOfSubscribers, 0);
+ executor.shutdown();
}
@DataProvider(name = "sizeAndItems")
@@ -125,6 +126,7 @@
assertEqualsWithRetry(publisher::getNumberOfSubscribers, 0);
assertEquals(exposingSubscriber.onNextInvocations, ITERATION_TIMES);
+ executor.shutdown();
}
// same as above but with more racy conditions, do not wait on the gate
@@ -151,6 +153,7 @@
int onNextInvocations = exposingSubscriber.onNextInvocations;
assertTrue(onNextInvocations <= ITERATION_TIMES,
"Expected <= " + ITERATION_TIMES + ", got " + onNextInvocations);
+ executor.shutdown();
}
static class ExposingSubscriber implements BodySubscriber<Void> {
--- a/test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java Thu Nov 23 10:21:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java Thu Nov 23 12:50:34 2017 +0000
@@ -83,6 +83,7 @@
furtherCancelsRequestsShouldBeNoOp(s);
assertEquals(exposingSubscriber.onErrorInvocations, 1);
+ executor.shutdown();
}
@@ -130,12 +131,13 @@
furtherCancelsRequestsShouldBeNoOp(s);
assertEquals(exposingSubscriber.onErrorInvocations, 1);
assertEquals(exposingSubscriber.onCompleteInvocations, 0);
+ executor.shutdown();
}
@Test(dataProvider = "bufferAndItemSizes")
public void onCompleteFromPublisher(int bufferSize,
int numberOfItems)
- throws Exception
+ throws Exception
{
ExecutorService executor = Executors.newFixedThreadPool(1);
SubmissionPublisher<List<ByteBuffer>> publisher =
@@ -163,6 +165,7 @@
assertEquals(exposingSubscriber.onErrorInvocations, 0);
assertEquals(exposingSubscriber.onCompleteInvocations, 1);
assertEquals(exposingSubscriber.throwable, null);
+ executor.shutdown();
}
static class ExposingSubscriber implements BodySubscriber<Void> {