http-client-branch: further review comments on error handling in BufferedSubscriber http-client-branch
authorchegar
Thu, 23 Nov 2017 12:50:34 +0000
branchhttp-client-branch
changeset 55861 0683f22cf2b9
parent 55860 dc4a39bc1517
child 55862 faa39b5ec8e1
http-client-branch: further review comments on error handling in BufferedSubscriber
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java
test/jdk/java/net/httpclient/BufferingSubscriberCancelTest.java
test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java	Thu Nov 23 10:21:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java	Thu Nov 23 12:50:34 2017 +0000
@@ -62,6 +62,9 @@
     /** The actual accumulated remaining bytes in internalBuffers. */
     private int accumulatedBytes;
 
+    /** Holds the Throwable from upstream's onError. */
+    private volatile Throwable throwable;
+
     /** State of the buffering subscriber:
      *  1) [UNSUBSCRIBED] when initially created
      *  2) [ACTIVE] when subscribed and can receive data
@@ -152,16 +155,18 @@
     private class DownstreamSubscription implements Flow.Subscription {
         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
         private final Demand demand = new Demand();
+        private volatile boolean illegalArg;
 
         @Override
         public void request(long n) {
-            if (cancelled.get()) {
+            if (cancelled.get() || illegalArg) {
                 return;
             }
             if (n <= 0L) {
-                cancel();
-                onError(new IllegalArgumentException(
-                        "non-positive subscription request"));
+                // pass the "bad" value upstream so the Publisher can deal with
+                // it appropriately, i.e. invoke onError
+                illegalArg = true;
+                subscription.request(n);
                 return;
             }
 
@@ -183,6 +188,13 @@
             @Override
             public void run() {
                 try {
+                    Throwable t = throwable;
+                    if (t != null) {
+                        downstreamSubscriber.onError(t);
+                        pushDemandedScheduler.stop(); // stop the demand scheduler
+                        return;
+                    }
+
                     while (true) {
                         List<ByteBuffer> item;
                         synchronized (buffersLock) {
@@ -208,10 +220,11 @@
                     }
                     if (complete) {
                         downstreamSubscriber.onComplete();
+                        pushDemandedScheduler.stop(); // stop the demand scheduler
                         return;
                     }
                 } catch (Throwable t) {
-                    cancel();  // cancel if there is any find of error
+                    cancel();  // cancel if there is any error
                     throw t;
                 }
 
@@ -273,12 +286,15 @@
     }
 
     @Override
-    public void onError(Throwable throwable) {
-        Objects.requireNonNull(throwable);
+    public void onError(Throwable incomingThrowable) {
+        Objects.requireNonNull(incomingThrowable);
         int s = state;
-        assert s == ACTIVE || s == CANCELLED: "Expected ACTIVE||CANCELLED, got:" + s;
+        assert s == ACTIVE : "Expected ACTIVE, got:" + s;
         state = ERROR;
-        downstreamSubscriber.onError(throwable);
+        Throwable t = this.throwable;
+        assert t == null : "Expected null, got:" + t;
+        this.throwable = incomingThrowable;
+        downstreamSubscription.pushDemanded();
     }
 
     @Override
--- a/test/jdk/java/net/httpclient/BufferingSubscriberCancelTest.java	Thu Nov 23 10:21:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberCancelTest.java	Thu Nov 23 12:50:34 2017 +0000
@@ -82,6 +82,7 @@
         s.request(MAX_VALUE); s.request(MAX_VALUE); s.request(MAX_VALUE);
         s.request(-1); s.request(-100); s.request(MIN_VALUE);
         assertEqualsWithRetry(publisher::getNumberOfSubscribers, 0);
+        executor.shutdown();
     }
 
     @DataProvider(name = "sizeAndItems")
@@ -125,6 +126,7 @@
 
         assertEqualsWithRetry(publisher::getNumberOfSubscribers, 0);
         assertEquals(exposingSubscriber.onNextInvocations, ITERATION_TIMES);
+        executor.shutdown();
     }
 
     // same as above but with more racy conditions, do not wait on the gate
@@ -151,6 +153,7 @@
         int onNextInvocations = exposingSubscriber.onNextInvocations;
         assertTrue(onNextInvocations <= ITERATION_TIMES,
                    "Expected <= " + ITERATION_TIMES + ", got " + onNextInvocations);
+        executor.shutdown();
     }
 
     static class ExposingSubscriber implements BodySubscriber<Void> {
--- a/test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java	Thu Nov 23 10:21:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java	Thu Nov 23 12:50:34 2017 +0000
@@ -83,6 +83,7 @@
 
         furtherCancelsRequestsShouldBeNoOp(s);
         assertEquals(exposingSubscriber.onErrorInvocations, 1);
+        executor.shutdown();
     }
 
 
@@ -130,12 +131,13 @@
         furtherCancelsRequestsShouldBeNoOp(s);
         assertEquals(exposingSubscriber.onErrorInvocations, 1);
         assertEquals(exposingSubscriber.onCompleteInvocations, 0);
+        executor.shutdown();
     }
 
     @Test(dataProvider = "bufferAndItemSizes")
     public void onCompleteFromPublisher(int bufferSize,
                                         int numberOfItems)
-            throws Exception
+        throws Exception
     {
         ExecutorService executor = Executors.newFixedThreadPool(1);
         SubmissionPublisher<List<ByteBuffer>> publisher =
@@ -163,6 +165,7 @@
         assertEquals(exposingSubscriber.onErrorInvocations, 0);
         assertEquals(exposingSubscriber.onCompleteInvocations, 1);
         assertEquals(exposingSubscriber.throwable, null);
+        executor.shutdown();
     }
 
     static class ExposingSubscriber implements BodySubscriber<Void> {