src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java
branchhttp-client-branch
changeset 55861 0683f22cf2b9
parent 55859 4ca3e578b9c4
child 55959 7fd1a84f4fae
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java	Thu Nov 23 10:21:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java	Thu Nov 23 12:50:34 2017 +0000
@@ -62,6 +62,9 @@
     /** The actual accumulated remaining bytes in internalBuffers. */
     private int accumulatedBytes;
 
+    /** Holds the Throwable from upstream's onError. */
+    private volatile Throwable throwable;
+
     /** State of the buffering subscriber:
      *  1) [UNSUBSCRIBED] when initially created
      *  2) [ACTIVE] when subscribed and can receive data
@@ -152,16 +155,18 @@
     private class DownstreamSubscription implements Flow.Subscription {
         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
         private final Demand demand = new Demand();
+        private volatile boolean illegalArg;
 
         @Override
         public void request(long n) {
-            if (cancelled.get()) {
+            if (cancelled.get() || illegalArg) {
                 return;
             }
             if (n <= 0L) {
-                cancel();
-                onError(new IllegalArgumentException(
-                        "non-positive subscription request"));
+                // pass the "bad" value upstream so the Publisher can deal with
+                // it appropriately, i.e. invoke onError
+                illegalArg = true;
+                subscription.request(n);
                 return;
             }
 
@@ -183,6 +188,13 @@
             @Override
             public void run() {
                 try {
+                    Throwable t = throwable;
+                    if (t != null) {
+                        downstreamSubscriber.onError(t);
+                        pushDemandedScheduler.stop(); // stop the demand scheduler
+                        return;
+                    }
+
                     while (true) {
                         List<ByteBuffer> item;
                         synchronized (buffersLock) {
@@ -208,10 +220,11 @@
                     }
                     if (complete) {
                         downstreamSubscriber.onComplete();
+                        pushDemandedScheduler.stop(); // stop the demand scheduler
                         return;
                     }
                 } catch (Throwable t) {
-                    cancel();  // cancel if there is any find of error
+                    cancel();  // cancel if there is any error
                     throw t;
                 }
 
@@ -273,12 +286,15 @@
     }
 
     @Override
-    public void onError(Throwable throwable) {
-        Objects.requireNonNull(throwable);
+    public void onError(Throwable incomingThrowable) {
+        Objects.requireNonNull(incomingThrowable);
         int s = state;
-        assert s == ACTIVE || s == CANCELLED: "Expected ACTIVE||CANCELLED, got:" + s;
+        assert s == ACTIVE : "Expected ACTIVE, got:" + s;
         state = ERROR;
-        downstreamSubscriber.onError(throwable);
+        Throwable t = this.throwable;
+        assert t == null : "Expected null, got:" + t;
+        this.throwable = incomingThrowable;
+        downstreamSubscription.pushDemanded();
     }
 
     @Override