8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
authordl
Fri, 08 Dec 2017 15:22:58 -0800
changeset 48231 8a6970acf8ad
parent 48230 d0e8542ef650
child 48232 bf476235671a
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published Reviewed-by: martin, psandoz, chegar
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
--- a/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java	Sun Dec 03 13:06:51 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java	Fri Dec 08 15:22:58 2017 -0800
@@ -1252,18 +1252,20 @@
                         head = h += taken;
                         d = subtractDemand(taken);
                     }
-                    else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
-                        closeOnComplete(s);          // end of stream
-                        break;
-                    }
                     else if ((d = demand) == 0L && (c & REQS) != 0)
                         weakCasCtl(c, c & ~REQS);    // exhausted demand
                     else if (d != 0L && (c & REQS) == 0)
                         weakCasCtl(c, c | REQS);     // new demand
-                    else if (t == (t = tail) && (empty || d == 0L)) {
-                        int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
-                        if (weakCasCtl(c, c & ~bit) && bit == RUN)
-                            break;                   // un-keep-alive or exit
+                    else if (t == (t = tail)) {      // stability check
+                        if ((empty = (t == h)) && (c & COMPLETE) != 0) {
+                            closeOnComplete(s);      // end of stream
+                            break;
+                        }
+                        else if (empty || d == 0L) {
+                            int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
+                            if (weakCasCtl(c, c & ~bit) && bit == RUN)
+                                break;               // un-keep-alive or exit
+                        }
                     }
                 }
             }