8187947: A race condition in SubmissionPublisher
authordl
Sat, 02 Dec 2017 10:08:55 -0800
changeset 48047 ff597804e8c1
parent 48046 98801bd22f5b
child 48048 f55cdd83e303
8187947: A race condition in SubmissionPublisher Reviewed-by: martin, psandoz
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java
--- a/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java	Sat Dec 02 10:03:41 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java	Sat Dec 02 10:08:55 2017 -0800
@@ -38,11 +38,15 @@
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import static java.util.concurrent.Flow.Publisher;
+import static java.util.concurrent.Flow.Subscriber;
+import static java.util.concurrent.Flow.Subscription;
 
 /**
  * A {@link Flow.Publisher} that asynchronously issues submitted
@@ -74,6 +78,14 @@
  * Flow#defaultBufferSize()} may provide a useful starting point for
  * choosing a capacity based on expected rates, resources, and usages.
  *
+ * <p>A single SubmissionPublisher may be shared among multiple
+ * sources. Actions in a source thread prior to publishing an item or
+ * issuing a signal <a href="package-summary.html#MemoryVisibility">
+ * <i>happen-before</i></a> actions subsequent to the corresponding
+ * access by each subscriber. But reported estimates of lag and demand
+ * are designed for use in monitoring, not for synchronization
+ * control, and may reflect stale or inaccurate views of progress.
+ *
  * <p>Publication methods support different policies about what to do
  * when buffers are saturated. Method {@link #submit(Object) submit}
  * blocks until resources are available. This is simplest, but least
@@ -158,19 +170,27 @@
  * @author Doug Lea
  * @since 9
  */
-public class SubmissionPublisher<T> implements Flow.Publisher<T>,
+public class SubmissionPublisher<T> implements Publisher<T>,
                                                AutoCloseable {
     /*
      * Most mechanics are handled by BufferedSubscription. This class
      * mainly tracks subscribers and ensures sequentiality, by using
-     * built-in synchronization locks across public methods. (Using
+     * built-in synchronization locks across public methods. Using
      * built-in locks works well in the most typical case in which
-     * only one thread submits items).
+     * only one thread submits items. We extend this idea in
+     * submission methods by detecting single-ownership to reduce
+     * producer-consumer synchronization strength.
      */
 
     /** The largest possible power of two array size. */
     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 
+    /**
+     * Initial buffer capacity used when maxBufferCapacity is
+     * greater. Must be a power of two.
+     */
+    static final int INITIAL_CAPACITY = 32;
+
     /** Round capacity to power of 2, at most limit. */
     static final int roundCapacity(int cap) {
         int n = cap - 1;
@@ -206,7 +226,7 @@
      * but we expect that subscribing is much less common than
      * publishing. Unsubscribing occurs only during traversal loops,
      * when BufferedSubscription methods return negative values
-     * signifying that they have been disabled.  To reduce
+     * signifying that they have been closed.  To reduce
      * head-of-line blocking, submit and offer methods first call
      * BufferedSubscription.offer on each subscriber, and place
      * saturated ones in retries list (using nextRetry field), and
@@ -216,12 +236,16 @@
 
     /** Run status, updated only within locks */
     volatile boolean closed;
+    /** Set true on first call to subscribe, to initialize possible owner */
+    boolean subscribed;
+    /** The first caller thread to subscribe, or null if thread ever changed */
+    Thread owner;
     /** If non-null, the exception in closeExceptionally */
     volatile Throwable closedException;
 
     // Parameters for constructing BufferedSubscriptions
     final Executor executor;
-    final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
+    final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
     final int maxBufferCapacity;
 
     /**
@@ -245,7 +269,7 @@
      * positive
      */
     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
-                               BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
+                               BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
         if (executor == null)
             throw new NullPointerException();
         if (maxBufferCapacity <= 0)
@@ -311,12 +335,19 @@
      * @param subscriber the subscriber
      * @throws NullPointerException if subscriber is null
      */
-    public void subscribe(Flow.Subscriber<? super T> subscriber) {
+    public void subscribe(Subscriber<? super T> subscriber) {
         if (subscriber == null) throw new NullPointerException();
+        int max = maxBufferCapacity; // allocate initial array
+        Object[] array = new Object[max < INITIAL_CAPACITY ?
+                                    max : INITIAL_CAPACITY];
         BufferedSubscription<T> subscription =
-            new BufferedSubscription<T>(subscriber, executor,
-                                        onNextHandler, maxBufferCapacity);
+            new BufferedSubscription<T>(subscriber, executor, onNextHandler,
+                                        array, max);
         synchronized (this) {
+            if (!subscribed) {
+                subscribed = true;
+                owner = Thread.currentThread();
+            }
             for (BufferedSubscription<T> b = clients, pred = null;;) {
                 if (b == null) {
                     Throwable ex;
@@ -332,7 +363,7 @@
                     break;
                 }
                 BufferedSubscription<T> next = b.next;
-                if (b.isDisabled()) { // remove
+                if (b.isClosed()) {   // remove
                     b.next = null;    // detach
                     if (pred == null)
                         clients = next;
@@ -351,6 +382,107 @@
     }
 
     /**
+     * Common implementation for all three forms of submit and offer.
+     * Acts as submit if nanos == Long.MAX_VALUE, else offer.
+     */
+    private int doOffer(T item, long nanos,
+                        BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
+        if (item == null) throw new NullPointerException();
+        int lag = 0;
+        boolean complete, unowned;
+        synchronized (this) {
+            Thread t = Thread.currentThread(), o;
+            BufferedSubscription<T> b = clients;
+            if ((unowned = ((o = owner) != t)) && o != null)
+                owner = null;                     // disable bias
+            if (b == null)
+                complete = closed;
+            else {
+                complete = false;
+                boolean cleanMe = false;
+                BufferedSubscription<T> retries = null, rtail = null, next;
+                do {
+                    next = b.next;
+                    int stat = b.offer(item, unowned);
+                    if (stat == 0) {              // saturated; add to retry list
+                        b.nextRetry = null;       // avoid garbage on exceptions
+                        if (rtail == null)
+                            retries = b;
+                        else
+                            rtail.nextRetry = b;
+                        rtail = b;
+                    }
+                    else if (stat < 0)            // closed
+                        cleanMe = true;           // remove later
+                    else if (stat > lag)
+                        lag = stat;
+                } while ((b = next) != null);
+
+                if (retries != null || cleanMe)
+                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
+            }
+        }
+        if (complete)
+            throw new IllegalStateException("Closed");
+        else
+            return lag;
+    }
+
+    /**
+     * Helps, (timed) waits for, and/or drops buffers on list; returns
+     * lag or negative drops (for use in offer).
+     */
+    private int retryOffer(T item, long nanos,
+                           BiPredicate<Subscriber<? super T>, ? super T> onDrop,
+                           BufferedSubscription<T> retries, int lag,
+                           boolean cleanMe) {
+        for (BufferedSubscription<T> r = retries; r != null;) {
+            BufferedSubscription<T> nextRetry = r.nextRetry;
+            r.nextRetry = null;
+            if (nanos > 0L)
+                r.awaitSpace(nanos);
+            int stat = r.retryOffer(item);
+            if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
+                stat = r.retryOffer(item);
+            if (stat == 0)
+                lag = (lag >= 0) ? -1 : lag - 1;
+            else if (stat < 0)
+                cleanMe = true;
+            else if (lag >= 0 && stat > lag)
+                lag = stat;
+            r = nextRetry;
+        }
+        if (cleanMe)
+            cleanAndCount();
+        return lag;
+    }
+
+    /**
+     * Returns current list count after removing closed subscribers.
+     * Call only while holding lock.  Used mainly by retryOffer for
+     * cleanup.
+     */
+    private int cleanAndCount() {
+        int count = 0;
+        BufferedSubscription<T> pred = null, next;
+        for (BufferedSubscription<T> b = clients; b != null; b = next) {
+            next = b.next;
+            if (b.isClosed()) {
+                b.next = null;
+                if (pred == null)
+                    clients = next;
+                else
+                    pred.next = next;
+            }
+            else {
+                pred = b;
+                ++count;
+            }
+        }
+        return count;
+    }
+
+    /**
      * Publishes the given item to each current subscriber by
      * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
      * onNext} method, blocking uninterruptibly while resources for any
@@ -373,55 +505,7 @@
      * @throws RejectedExecutionException if thrown by Executor
      */
     public int submit(T item) {
-        if (item == null) throw new NullPointerException();
-        int lag = 0;
-        boolean complete;
-        synchronized (this) {
-            complete = closed;
-            BufferedSubscription<T> b = clients;
-            if (!complete) {
-                BufferedSubscription<T> pred = null, r = null, rtail = null;
-                while (b != null) {
-                    BufferedSubscription<T> next = b.next;
-                    int stat = b.offer(item);
-                    if (stat < 0) {           // disabled
-                        b.next = null;
-                        if (pred == null)
-                            clients = next;
-                        else
-                            pred.next = next;
-                    }
-                    else {
-                        if (stat > lag)
-                            lag = stat;
-                        else if (stat == 0) { // place on retry list
-                            b.nextRetry = null;
-                            if (rtail == null)
-                                r = b;
-                            else
-                                rtail.nextRetry = b;
-                            rtail = b;
-                        }
-                        pred = b;
-                    }
-                    b = next;
-                }
-                while (r != null) {
-                    BufferedSubscription<T> nextRetry = r.nextRetry;
-                    r.nextRetry = null;
-                    int stat = r.submit(item);
-                    if (stat > lag)
-                        lag = stat;
-                    else if (stat < 0 && clients == r)
-                        clients = r.next; // postpone internal unsubscribes
-                    r = nextRetry;
-                }
-            }
-        }
-        if (complete)
-            throw new IllegalStateException("Closed");
-        else
-            return lag;
+        return doOffer(item, Long.MAX_VALUE, null);
     }
 
     /**
@@ -462,8 +546,8 @@
      * @throws RejectedExecutionException if thrown by Executor
      */
     public int offer(T item,
-                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
-        return doOffer(0L, item, onDrop);
+                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
+        return doOffer(item, 0L, onDrop);
     }
 
     /**
@@ -510,71 +594,11 @@
      * @throws RejectedExecutionException if thrown by Executor
      */
     public int offer(T item, long timeout, TimeUnit unit,
-                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
-        return doOffer(unit.toNanos(timeout), item, onDrop);
-    }
-
-    /** Common implementation for both forms of offer */
-    final int doOffer(long nanos, T item,
-                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
-        if (item == null) throw new NullPointerException();
-        int lag = 0, drops = 0;
-        boolean complete;
-        synchronized (this) {
-            complete = closed;
-            BufferedSubscription<T> b = clients;
-            if (!complete) {
-                BufferedSubscription<T> pred = null, r = null, rtail = null;
-                while (b != null) {
-                    BufferedSubscription<T> next = b.next;
-                    int stat = b.offer(item);
-                    if (stat < 0) {
-                        b.next = null;
-                        if (pred == null)
-                            clients = next;
-                        else
-                            pred.next = next;
-                    }
-                    else {
-                        if (stat > lag)
-                            lag = stat;
-                        else if (stat == 0) {
-                            b.nextRetry = null;
-                            if (rtail == null)
-                                r = b;
-                            else
-                                rtail.nextRetry = b;
-                            rtail = b;
-                        }
-                        else if (stat > lag)
-                            lag = stat;
-                        pred = b;
-                    }
-                    b = next;
-                }
-                while (r != null) {
-                    BufferedSubscription<T> nextRetry = r.nextRetry;
-                    r.nextRetry = null;
-                    int stat = (nanos > 0L)
-                        ? r.timedOffer(item, nanos)
-                        : r.offer(item);
-                    if (stat == 0 && onDrop != null &&
-                        onDrop.test(r.subscriber, item))
-                        stat = r.offer(item);
-                    if (stat == 0)
-                        ++drops;
-                    else if (stat > lag)
-                        lag = stat;
-                    else if (stat < 0 && clients == r)
-                        clients = r.next;
-                    r = nextRetry;
-                }
-            }
-        }
-        if (complete)
-            throw new IllegalStateException("Closed");
-        else
-            return (drops > 0) ? -drops : lag;
+                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
+        long nanos = unit.toNanos(timeout);
+        // distinguishes from untimed (only wrt interrupt policy)
+        if (nanos == Long.MAX_VALUE) --nanos;
+        return doOffer(item, nanos, onDrop);
     }
 
     /**
@@ -591,6 +615,7 @@
                 // no need to re-check closed here
                 b = clients;
                 clients = null;
+                owner = null;
                 closed = true;
             }
             while (b != null) {
@@ -621,8 +646,9 @@
             synchronized (this) {
                 b = clients;
                 if (!closed) {  // don't clobber racing close
+                    closedException = error;
                     clients = null;
-                    closedException = error;
+                    owner = null;
                     closed = true;
                 }
             }
@@ -662,18 +688,16 @@
      */
     public boolean hasSubscribers() {
         boolean nonEmpty = false;
-        if (!closed) {
-            synchronized (this) {
-                for (BufferedSubscription<T> b = clients; b != null;) {
-                    BufferedSubscription<T> next = b.next;
-                    if (b.isDisabled()) {
-                        b.next = null;
-                        b = clients = next;
-                    }
-                    else {
-                        nonEmpty = true;
-                        break;
-                    }
+        synchronized (this) {
+            for (BufferedSubscription<T> b = clients; b != null;) {
+                BufferedSubscription<T> next = b.next;
+                if (b.isClosed()) {
+                    b.next = null;
+                    b = clients = next;
+                }
+                else {
+                    nonEmpty = true;
+                    break;
                 }
             }
         }
@@ -686,27 +710,9 @@
      * @return the number of current subscribers
      */
     public int getNumberOfSubscribers() {
-        int count = 0;
-        if (!closed) {
-            synchronized (this) {
-                BufferedSubscription<T> pred = null, next;
-                for (BufferedSubscription<T> b = clients; b != null; b = next) {
-                    next = b.next;
-                    if (b.isDisabled()) {
-                        b.next = null;
-                        if (pred == null)
-                            clients = next;
-                        else
-                            pred.next = next;
-                    }
-                    else {
-                        pred = b;
-                        ++count;
-                    }
-                }
-            }
+        synchronized (this) {
+            return cleanAndCount();
         }
-        return count;
     }
 
     /**
@@ -734,13 +740,13 @@
      *
      * @return list of current subscribers
      */
-    public List<Flow.Subscriber<? super T>> getSubscribers() {
-        ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
+    public List<Subscriber<? super T>> getSubscribers() {
+        ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
         synchronized (this) {
             BufferedSubscription<T> pred = null, next;
             for (BufferedSubscription<T> b = clients; b != null; b = next) {
                 next = b.next;
-                if (b.isDisabled()) {
+                if (b.isClosed()) {
                     b.next = null;
                     if (pred == null)
                         clients = next;
@@ -761,14 +767,14 @@
      * @return true if currently subscribed
      * @throws NullPointerException if subscriber is null
      */
-    public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
+    public boolean isSubscribed(Subscriber<? super T> subscriber) {
         if (subscriber == null) throw new NullPointerException();
         if (!closed) {
             synchronized (this) {
                 BufferedSubscription<T> pred = null, next;
                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
                     next = b.next;
-                    if (b.isDisabled()) {
+                    if (b.isClosed()) {
                         b.next = null;
                         if (pred == null)
                             clients = next;
@@ -872,16 +878,15 @@
     }
 
     /** Subscriber for method consume */
-    private static final class ConsumerSubscriber<T>
-        implements Flow.Subscriber<T> {
+    static final class ConsumerSubscriber<T> implements Subscriber<T> {
         final CompletableFuture<Void> status;
         final Consumer<? super T> consumer;
-        Flow.Subscription subscription;
+        Subscription subscription;
         ConsumerSubscriber(CompletableFuture<Void> status,
                            Consumer<? super T> consumer) {
             this.status = status; this.consumer = consumer;
         }
-        public final void onSubscribe(Flow.Subscription subscription) {
+        public final void onSubscribe(Subscription subscription) {
             this.subscription = subscription;
             status.whenComplete((v, e) -> subscription.cancel());
             if (!status.isDone())
@@ -925,634 +930,534 @@
     }
 
     /**
-     * A bounded (ring) buffer with integrated control to start a
-     * consumer task whenever items are available.  The buffer
-     * algorithm is similar to one used inside ForkJoinPool (see its
-     * internal documentation for details) specialized for the case of
-     * at most one concurrent producer and consumer, and power of two
-     * buffer sizes. This allows methods to operate without locks even
-     * while supporting resizing, blocking, task-triggering, and
-     * garbage-free buffers (nulling out elements when consumed),
-     * although supporting these does impose a bit of overhead
-     * compared to plain fixed-size ring buffers.
-     *
-     * The publisher guarantees a single producer via its lock.  We
-     * ensure in this class that there is at most one consumer.  The
-     * request and cancel methods must be fully thread-safe but are
-     * coded to exploit the most common case in which they are only
-     * called by consumers (usually within onNext).
+     * A resizable array-based ring buffer with integrated control to
+     * start a consumer task whenever items are available.  The buffer
+     * algorithm is specialized for the case of at most one concurrent
+     * producer and consumer, and power of two buffer sizes. It relies
+     * primarily on atomic operations (CAS or getAndSet) at the next
+     * array slot to put or take an element, at the "tail" and "head"
+     * indices written only by the producer and consumer respectively.
      *
-     * Execution control is managed using the ACTIVE ctl bit. We
-     * ensure that a task is active when consumable items (and
-     * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
-     * there is demand (unfilled requests).  This is complicated on
-     * the creation side by the possibility of exceptions when trying
-     * to execute tasks. These eventually force DISABLED state, but
-     * sometimes not directly. On the task side, termination (clearing
-     * ACTIVE) that would otherwise race with producers or request()
-     * calls uses the CONSUME keep-alive bit to force a recheck.
+     * We ensure internally that there is at most one active consumer
+     * task at any given time. The publisher guarantees a single
+     * producer via its lock. Sync among producers and consumers
+     * relies on volatile fields "ctl", "demand", and "waiting" (along
+     * with element access). Other variables are accessed in plain
+     * mode, relying on outer ordering and exclusion, and/or enclosing
+     * them within other volatile accesses. Some atomic operations are
+     * avoided by tracking single threaded ownership by producers (in
+     * the style of biased locking).
      *
-     * The ctl field also manages run state. When DISABLED, no further
-     * updates are possible. Disabling may be preceded by setting
-     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
-     * case the associated Subscriber methods are invoked, possibly
-     * synchronously if there is no active consumer task (including
-     * cases where execute() failed). The cancel() method is supported
-     * by treating as ERROR but suppressing onError signal.
+     * Execution control and protocol state are managed using field
+     * "ctl".  Methods to subscribe, close, request, and cancel set
+     * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
+     * and ensure that a task is running. (The corresponding consumer
+     * side actions are in method consume.)  To avoid starting a new
+     * task on each action, ctl also includes a keep-alive bit
+     * (ACTIVE) that is refreshed if needed on producer actions.
+     * (Maintaining agreement about keep-alives requires most atomic
+     * updates to be full SC/Volatile strength, which is still much
+     * cheaper than using one task per item.)  Error signals
+     * additionally null out items and/or fields to reduce termination
+     * latency.  The cancel() method is supported by treating as ERROR
+     * but suppressing onError signal.
      *
      * Support for blocking also exploits the fact that there is only
      * one possible waiter. ManagedBlocker-compatible control fields
      * are placed in this class itself rather than in wait-nodes.
-     * Blocking control relies on the "waiter" field. Producers set
-     * the field before trying to block, but must then recheck (via
-     * offer) before parking. Signalling then just unparks and clears
-     * waiter field. If the producer and/or consumer are using a
-     * ForkJoinPool, the producer attempts to help run consumer tasks
-     * via ForkJoinPool.helpAsyncBlocker before blocking.
+     * Blocking control relies on the "waiting" and "waiter"
+     * fields. Producers set them before trying to block. Signalling
+     * unparks and clears fields. If the producer and/or consumer are
+     * using a ForkJoinPool, the producer attempts to help run
+     * consumer tasks via ForkJoinPool.helpAsyncBlocker before
+     * blocking.
      *
-     * This class uses @Contended and heuristic field declaration
-     * ordering to reduce false-sharing-based memory contention among
-     * instances of BufferedSubscription, but it does not currently
-     * attempt to avoid memory contention among buffers. This field
-     * and element packing can hurt performance especially when each
-     * publisher has only one client operating at a high rate.
-     * Addressing this may require allocating substantially more space
-     * than users expect.
+     * Usages of this class may encounter any of several forms of
+     * memory contention. We try to ameliorate across them without
+     * unduly impacting footprints in low-contention usages where it
+     * isn't needed. Buffer arrays start out small and grow only as
+     * needed.  The class uses @Contended and heuristic field
+     * declaration ordering to reduce false-sharing memory contention
+     * across instances of BufferedSubscription (as in, multiple
+     * subscribers per publisher).  We additionally segregate some
+     * fields that would otherwise nearly always encounter cache line
+     * contention among producers and consumers. To reduce contention
+     * across time (vs space), consumers only periodically update
+     * other fields (see method takeItems), at the expense of possibly
+     * staler reporting of lags and demand (bounded at 12.5% == 1/8
+     * capacity) and possibly more atomic operations.
+     *
+     * Other forms of imbalance and slowdowns can occur during startup
+     * when producer and consumer methods are compiled and/or memory
+     * is allocated at different rates.  This is ameliorated by
+     * artificially subdividing some consumer methods, including
+     * isolation of all subscriber callbacks.  This code also includes
+     * typical power-of-two array screening idioms to avoid compilers
+     * generating traps, along with the usual SSA-based inline
+     * assignment coding style. Also, all methods and fields have
+     * default visibility to simplify usage by callers.
      */
     @SuppressWarnings("serial")
     @jdk.internal.vm.annotation.Contended
-    private static final class BufferedSubscription<T>
-        implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
-        // Order-sensitive field declarations
-        long timeout;                      // > 0 if timed wait
-        volatile long demand;              // # unfilled requests
-        int maxCapacity;                   // reduced on OOME
-        int putStat;                       // offer result for ManagedBlocker
+    static final class BufferedSubscription<T>
+        implements Subscription, ForkJoinPool.ManagedBlocker {
+        long timeout;                      // Long.MAX_VALUE if untimed wait
+        int head;                          // next position to take
+        int tail;                          // next position to put
+        final int maxCapacity;             // max buffer size
         volatile int ctl;                  // atomic run state flags
-        volatile int head;                 // next position to take
-        int tail;                          // next position to put
-        Object[] array;                    // buffer: null if disabled
-        Flow.Subscriber<? super T> subscriber; // null if disabled
-        Executor executor;                 // null if disabled
-        BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
-        volatile Throwable pendingError;   // holds until onError issued
-        volatile Thread waiter;            // blocked producer thread
-        T putItem;                         // for offer within ManagedBlocker
+        Object[] array;                    // buffer
+        final Subscriber<? super T> subscriber;
+        final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
+        Executor executor;                 // null on error
+        Thread waiter;                     // blocked producer thread
+        Throwable pendingError;            // holds until onError issued
         BufferedSubscription<T> next;      // used only by publisher
         BufferedSubscription<T> nextRetry; // used only by publisher
 
-        // ctl values
-        static final int ACTIVE    = 0x01; // consumer task active
-        static final int CONSUME   = 0x02; // keep-alive for consumer task
-        static final int DISABLED  = 0x04; // final state
-        static final int ERROR     = 0x08; // signal onError then disable
-        static final int SUBSCRIBE = 0x10; // signal onSubscribe
-        static final int COMPLETE  = 0x20; // signal onComplete when done
+        @jdk.internal.vm.annotation.Contended("c") // segregate
+        volatile long demand;              // # unfilled requests
+        @jdk.internal.vm.annotation.Contended("c")
+        volatile int waiting;              // nonzero if producer blocked
+
+        // ctl bit values
+        static final int CLOSED   = 0x01;  // if set, other bits ignored
+        static final int ACTIVE   = 0x02;  // keep-alive for consumer task
+        static final int REQS     = 0x04;  // (possibly) nonzero demand
+        static final int ERROR    = 0x08;  // issues onError when noticed
+        static final int COMPLETE = 0x10;  // issues onComplete when done
+        static final int RUN      = 0x20;  // task is or will be running
+        static final int OPEN     = 0x40;  // true after subscribe
 
         static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
 
-        /**
-         * Initial buffer capacity used when maxBufferCapacity is
-         * greater. Must be a power of two.
-         */
-        static final int DEFAULT_INITIAL_CAP = 32;
-
-        BufferedSubscription(Flow.Subscriber<? super T> subscriber,
+        BufferedSubscription(Subscriber<? super T> subscriber,
                              Executor executor,
-                             BiConsumer<? super Flow.Subscriber<? super T>,
+                             BiConsumer<? super Subscriber<? super T>,
                              ? super Throwable> onNextHandler,
+                             Object[] array,
                              int maxBufferCapacity) {
             this.subscriber = subscriber;
             this.executor = executor;
             this.onNextHandler = onNextHandler;
+            this.array = array;
             this.maxCapacity = maxBufferCapacity;
-            this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
-                                    (maxBufferCapacity < 2 ? // at least 2 slots
-                                     2 : maxBufferCapacity) :
-                                    DEFAULT_INITIAL_CAP];
+        }
+
+        // Wrappers for some VarHandle methods
+
+        final boolean weakCasCtl(int cmp, int val) {
+            return CTL.weakCompareAndSet(this, cmp, val);
         }
 
-        final boolean isDisabled() {
-            return ctl == DISABLED;
+        final int getAndBitwiseOrCtl(int bits) {
+            return (int)CTL.getAndBitwiseOr(this, bits);
         }
 
+        final long subtractDemand(int k) {
+            long n = (long)(-k);
+            return n + (long)DEMAND.getAndAdd(this, n);
+        }
+
+        final boolean casDemand(long cmp, long val) {
+            return DEMAND.compareAndSet(this, cmp, val);
+        }
+
+        // Utilities used by SubmissionPublisher
+
         /**
-         * Returns estimated number of buffered items, or -1 if
-         * disabled.
-         */
-        final int estimateLag() {
-            int n;
-            return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
-        }
-
-        /**
-         * Tries to add item and start consumer task if necessary.
-         * @return -1 if disabled, 0 if dropped, else estimated lag
+         * Returns true if closed (consumer task may still be running).
          */
-        final int offer(T item) {
-            int h = head, t = tail, cap, size, stat;
-            Object[] a = array;
-            if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
-                a[(cap - 1) & t] = item;    // relaxed writes OK
-                tail = t + 1;
-                stat = size;
-            }
-            else
-                stat = growAndAdd(a, item);
-            return (stat > 0 &&
-                    (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
-                startOnOffer(stat) : stat;
+        final boolean isClosed() {
+            return (ctl & CLOSED) != 0;
         }
 
         /**
-         * Tries to create or expand buffer, then adds item if possible.
+         * Returns estimated number of buffered items, or negative if
+         * closed.
+         */
+        final int estimateLag() {
+            int c = ctl, n = tail - head;
+            return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
+        }
+
+        // Methods for submitting items
+
+        /**
+         * Tries to add item and start consumer task if necessary.
+         * @return negative if closed, 0 if saturated, else estimated lag
          */
-        private int growAndAdd(Object[] a, T item) {
-            boolean alloc;
-            int cap, stat;
-            if ((ctl & (ERROR | DISABLED)) != 0) {
-                cap = 0;
-                stat = -1;
-                alloc = false;
-            }
-            else if (a == null || (cap = a.length) <= 0) {
-                cap = 0;
-                stat = 1;
-                alloc = true;
-            }
-            else {
-                VarHandle.fullFence();           // recheck
-                int h = head, t = tail, size = t + 1 - h;
-                if (cap >= size) {
-                    a[(cap - 1) & t] = item;
+        final int offer(T item, boolean unowned) {
+            Object[] a;
+            int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
+            int t = tail, i = t & (cap - 1), n = t + 1 - head;
+            if (cap > 0) {
+                boolean added;
+                if (n >= cap && cap < maxCapacity) // resize
+                    added = growAndoffer(item, a, t);
+                else if (n >= cap || unowned)      // need volatile CAS
+                    added = QA.compareAndSet(a, i, null, item);
+                else {                             // can use release mode
+                    QA.setRelease(a, i, item);
+                    added = true;
+                }
+                if (added) {
                     tail = t + 1;
-                    stat = size;
-                    alloc = false;
-                }
-                else if (cap >= maxCapacity) {
-                    stat = 0;                    // cannot grow
-                    alloc = false;
-                }
-                else {
-                    stat = cap + 1;
-                    alloc = true;
+                    stat = n;
                 }
             }
-            if (alloc) {
-                int newCap = (cap > 0) ? cap << 1 : 1;
-                if (newCap <= cap)
-                    stat = 0;
-                else {
-                    Object[] newArray = null;
-                    try {
-                        newArray = new Object[newCap];
-                    } catch (Throwable ex) {     // try to cope with OOME
-                    }
-                    if (newArray == null) {
-                        if (cap > 0)
-                            maxCapacity = cap;   // avoid continuous failure
-                        stat = 0;
-                    }
-                    else {
-                        array = newArray;
-                        int t = tail;
-                        int newMask = newCap - 1;
-                        if (a != null && cap > 0) {
-                            int mask = cap - 1;
-                            for (int j = head; j != t; ++j) {
-                                int k = j & mask;
-                                Object x = QA.getAcquire(a, k);
-                                if (x != null && // races with consumer
-                                    QA.compareAndSet(a, k, x, null))
-                                    newArray[j & newMask] = x;
-                            }
-                        }
-                        newArray[t & newMask] = item;
-                        tail = t + 1;
-                    }
+            return startOnOffer(stat);
+        }
+
+        /**
+         * Tries to expand buffer and add item, returning true on
+         * success. Currently fails only if out of memory.
+         */
+        final boolean growAndoffer(T item, Object[] a, int t) {
+            int cap = 0, newCap = 0;
+            Object[] newArray = null;
+            if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
+                try {
+                    newArray = new Object[newCap];
+                } catch (OutOfMemoryError ex) {
                 }
             }
+            if (newArray == null)
+                return false;
+            else {                                // take and move items
+                int newMask = newCap - 1;
+                newArray[t-- & newMask] = item;
+                for (int mask = cap - 1, k = mask; k >= 0; --k) {
+                    Object x = QA.getAndSet(a, t & mask, null);
+                    if (x == null)
+                        break;                    // already consumed
+                    else
+                        newArray[t-- & newMask] = x;
+                }
+                array = newArray;
+                VarHandle.releaseFence();         // release array and slots
+                return true;
+            }
+        }
+
+        /**
+         * Version of offer for retries (no resize or bias)
+         */
+        final int retryOffer(T item) {
+            Object[] a;
+            int stat = 0, t = tail, h = head, cap;
+            if ((a = array) != null && (cap = a.length) > 0 &&
+                QA.compareAndSet(a, (cap - 1) & t, null, item))
+                stat = (tail = t + 1) - h;
+            return startOnOffer(stat);
+        }
+
+        /**
+         * Tries to start consumer task after offer.
+         * @return negative if now closed, else argument
+         */
+        final int startOnOffer(int stat) {
+            int c; // start or keep alive if requests exist and not active
+            if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
+                ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
+                tryStart();
+            else if ((c & CLOSED) != 0)
+                stat = -1;
             return stat;
         }
 
         /**
-         * Spins/helps/blocks while offer returns 0.  Called only if
-         * initial offer return 0.
-         */
-        final int submit(T item) {
-            int stat;
-            if ((stat = offer(item)) == 0) {
-                putItem = item;
-                timeout = 0L;
-                putStat = 0;
-                ForkJoinPool.helpAsyncBlocker(executor, this);
-                if ((stat = putStat) == 0) {
-                    try {
-                        ForkJoinPool.managedBlock(this);
-                    } catch (InterruptedException ie) {
-                        timeout = INTERRUPTED;
-                    }
-                    stat = putStat;
-                }
-                if (timeout < 0L)
-                    Thread.currentThread().interrupt();
-            }
-            return stat;
-        }
-
-        /**
-         * Timeout version; similar to submit.
-         */
-        final int timedOffer(T item, long nanos) {
-            int stat;
-            if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
-                putItem = item;
-                putStat = 0;
-                ForkJoinPool.helpAsyncBlocker(executor, this);
-                if ((stat = putStat) == 0) {
-                    try {
-                        ForkJoinPool.managedBlock(this);
-                    } catch (InterruptedException ie) {
-                        timeout = INTERRUPTED;
-                    }
-                    stat = putStat;
-                }
-                if (timeout < 0L)
-                    Thread.currentThread().interrupt();
-            }
-            return stat;
-        }
-
-        /**
-         * Tries to start consumer task after offer.
-         * @return -1 if now disabled, else argument
+         * Tries to start consumer task. Sets error state on failure.
          */
-        private int startOnOffer(int stat) {
-            for (;;) {
-                Executor e; int c;
-                if ((c = ctl) == DISABLED || (e = executor) == null) {
-                    stat = -1;
-                    break;
-                }
-                else if ((c & ACTIVE) != 0) { // ensure keep-alive
-                    if ((c & CONSUME) != 0 ||
-                        CTL.compareAndSet(this, c, c | CONSUME))
-                        break;
-                }
-                else if (demand == 0L || tail == head)
-                    break;
-                else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
-                    try {
-                        e.execute(new ConsumerTask<T>(this));
-                        break;
-                    } catch (RuntimeException | Error ex) { // back out
-                        do {} while (((c = ctl) & DISABLED) == 0 &&
-                                     (c & ACTIVE) != 0 &&
-                                     !CTL.weakCompareAndSet
-                                     (this, c, c & ~ACTIVE));
-                        throw ex;
-                    }
-                }
-            }
-            return stat;
-        }
-
-        private void signalWaiter(Thread w) {
-            waiter = null;
-            LockSupport.unpark(w);    // release producer
-        }
-
-        /**
-         * Nulls out most fields, mainly to avoid garbage retention
-         * until publisher unsubscribes, but also to help cleanly stop
-         * upon error by nulling required components.
-         */
-        private void detach() {
-            Thread w = waiter;
-            executor = null;
-            subscriber = null;
-            pendingError = null;
-            signalWaiter(w);
-        }
-
-        /**
-         * Issues error signal, asynchronously if a task is running,
-         * else synchronously.
-         */
-        final void onError(Throwable ex) {
-            for (int c;;) {
-                if (((c = ctl) & (ERROR | DISABLED)) != 0)
-                    break;
-                else if ((c & ACTIVE) != 0) {
-                    pendingError = ex;
-                    if (CTL.compareAndSet(this, c, c | ERROR))
-                        break; // cause consumer task to exit
-                }
-                else if (CTL.compareAndSet(this, c, DISABLED)) {
-                    Flow.Subscriber<? super T> s = subscriber;
-                    if (s != null && ex != null) {
-                        try {
-                            s.onError(ex);
-                        } catch (Throwable ignore) {
-                        }
-                    }
-                    detach();
-                    break;
-                }
+        final void tryStart() {
+            try {
+                Executor e;
+                ConsumerTask<T> task = new ConsumerTask<T>(this);
+                if ((e = executor) != null)   // skip if disabled on error
+                    e.execute(task);
+            } catch (RuntimeException | Error ex) {
+                getAndBitwiseOrCtl(ERROR | CLOSED);
+                throw ex;
             }
         }
 
+        // Signals to consumer tasks
+
         /**
-         * Tries to start consumer task upon a signal or request;
-         * disables on failure.
+         * Sets the given control bits, starting task if not running or closed.
+         * @param bits state bits, assumed to include RUN but not CLOSED
          */
-        private void startOrDisable() {
-            Executor e;
-            if ((e = executor) != null) { // skip if already disabled
-                try {
-                    e.execute(new ConsumerTask<T>(this));
-                } catch (Throwable ex) {  // back out and force signal
-                    for (int c;;) {
-                        if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
-                            break;
-                        if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
-                            onError(ex);
-                            break;
-                        }
-                    }
-                }
-            }
+        final void startOnSignal(int bits) {
+            if ((ctl & bits) != bits &&
+                (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
+                tryStart();
+        }
+
+        final void onSubscribe() {
+            startOnSignal(RUN | ACTIVE);
         }
 
         final void onComplete() {
-            for (int c;;) {
-                if ((c = ctl) == DISABLED)
-                    break;
-                if (CTL.compareAndSet(this, c,
-                                      c | (ACTIVE | CONSUME | COMPLETE))) {
-                    if ((c & ACTIVE) == 0)
-                        startOrDisable();
-                    break;
-                }
-            }
+            startOnSignal(RUN | ACTIVE | COMPLETE);
         }
 
-        final void onSubscribe() {
-            for (int c;;) {
-                if ((c = ctl) == DISABLED)
-                    break;
-                if (CTL.compareAndSet(this, c,
-                                      c | (ACTIVE | CONSUME | SUBSCRIBE))) {
-                    if ((c & ACTIVE) == 0)
-                        startOrDisable();
-                    break;
-                }
+        final void onError(Throwable ex) {
+            int c; Object[] a;      // to null out buffer on async error
+            if (ex != null)
+                pendingError = ex;  // races are OK
+            if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
+                if ((c & RUN) == 0)
+                    tryStart();
+                else if ((a = array) != null)
+                    Arrays.fill(a, null);
             }
         }
 
-        /**
-         * Causes consumer task to exit if active (without reporting
-         * onError unless there is already a pending error), and
-         * disables.
-         */
-        public void cancel() {
-            for (int c;;) {
-                if ((c = ctl) == DISABLED)
-                    break;
-                else if ((c & ACTIVE) != 0) {
-                    if (CTL.compareAndSet(this, c,
-                                          c | (CONSUME | ERROR)))
-                        break;
-                }
-                else if (CTL.compareAndSet(this, c, DISABLED)) {
-                    detach();
-                    break;
-                }
-            }
+        public final void cancel() {
+            onError(null);
         }
 
-        /**
-         * Adds to demand and possibly starts task.
-         */
-        public void request(long n) {
+        public final void request(long n) {
             if (n > 0L) {
                 for (;;) {
-                    long prev = demand, d;
-                    if ((d = prev + n) < prev) // saturate
-                        d = Long.MAX_VALUE;
-                    if (DEMAND.compareAndSet(this, prev, d)) {
-                        for (int c, h;;) {
-                            if ((c = ctl) == DISABLED)
-                                break;
-                            else if ((c & ACTIVE) != 0) {
-                                if ((c & CONSUME) != 0 ||
-                                    CTL.compareAndSet(this, c, c | CONSUME))
-                                    break;
-                            }
-                            else if ((h = head) != tail) {
-                                if (CTL.compareAndSet(this, c,
-                                                      c | (ACTIVE|CONSUME))) {
-                                    startOrDisable();
-                                    break;
-                                }
-                            }
-                            else if (head == h && tail == h)
-                                break;          // else stale
-                            if (demand == 0L)
-                                break;
-                        }
+                    long p = demand, d = p + n;  // saturate
+                    if (casDemand(p, d < p ? Long.MAX_VALUE : d))
                         break;
-                    }
                 }
+                startOnSignal(RUN | ACTIVE | REQS);
             }
             else
                 onError(new IllegalArgumentException(
                             "non-positive subscription request"));
         }
 
-        public final boolean isReleasable() { // for ManagedBlocker
-            T item = putItem;
-            if (item != null) {
-                if ((putStat = offer(item)) == 0)
-                    return false;
-                putItem = null;
-            }
-            return true;
-        }
-
-        public final boolean block() { // for ManagedBlocker
-            T item = putItem;
-            if (item != null) {
-                putItem = null;
-                long nanos = timeout;
-                long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
-                while ((putStat = offer(item)) == 0) {
-                    if (Thread.interrupted()) {
-                        timeout = INTERRUPTED;
-                        if (nanos > 0L)
-                            break;
-                    }
-                    else if (nanos > 0L &&
-                             (nanos = deadline - System.nanoTime()) <= 0L)
-                        break;
-                    else if (waiter == null)
-                        waiter = Thread.currentThread();
-                    else {
-                        if (nanos > 0L)
-                            LockSupport.parkNanos(this, nanos);
-                        else
-                            LockSupport.park(this);
-                        waiter = null;
-                    }
-                }
-            }
-            waiter = null;
-            return true;
-        }
+        // Consumer task actions
 
         /**
-         * Consumer loop, called from ConsumerTask, or indirectly
-         * when helping during submit.
+         * Consumer loop, called from ConsumerTask, or indirectly when
+         * helping during submit.
          */
         final void consume() {
-            Flow.Subscriber<? super T> s;
-            int h = head;
-            if ((s = subscriber) != null) {           // else disabled
-                for (;;) {
-                    long d = demand;
-                    int c; Object[] a; int n, i; Object x; Thread w;
-                    if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
-                        if (!checkControl(s, c))
-                            break;
-                    }
-                    else if ((a = array) == null || h == tail ||
-                             (n = a.length) == 0 ||
-                             (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
-                        if (!checkEmpty(s, c))
-                            break;
+            Subscriber<? super T> s;
+            if ((s = subscriber) != null) {          // hoist checks
+                subscribeOnOpen(s);
+                long d = demand;
+                for (int h = head, t = tail;;) {
+                    int c, taken; boolean empty;
+                    if (((c = ctl) & ERROR) != 0) {
+                        closeOnError(s, null);
+                        break;
                     }
-                    else if (d == 0L) {
-                        if (!checkDemand(c))
-                            break;
+                    else if ((taken = takeItems(s, d, h)) > 0) {
+                        head = h += taken;
+                        d = subtractDemand(taken);
+                    }
+                    else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
+                        closeOnComplete(s);          // end of stream
+                        break;
                     }
-                    else if (((c & CONSUME) != 0 ||
-                              CTL.compareAndSet(this, c, c | CONSUME)) &&
-                             QA.compareAndSet(a, i, x, null)) {
-                        HEAD.setRelease(this, ++h);
-                        DEMAND.getAndAdd(this, -1L);
-                        if ((w = waiter) != null)
-                            signalWaiter(w);
-                        try {
-                            @SuppressWarnings("unchecked") T y = (T) x;
-                            s.onNext(y);
-                        } catch (Throwable ex) {
-                            handleOnNext(s, ex);
-                        }
+                    else if ((d = demand) == 0L && (c & REQS) != 0)
+                        weakCasCtl(c, c & ~REQS);    // exhausted demand
+                    else if (d != 0L && (c & REQS) == 0)
+                        weakCasCtl(c, c | REQS);     // new demand
+                    else if (t == (t = tail) && (empty || d == 0L)) {
+                        int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
+                        if (weakCasCtl(c, c & ~bit) && bit == RUN)
+                            break;                   // un-keep-alive or exit
                     }
                 }
             }
         }
 
         /**
-         * Responds to control events in consume().
+         * Consumes some items until unavailable or bound or error.
+         *
+         * @param s subscriber
+         * @param d current demand
+         * @param h current head
+         * @return number taken
          */
-        private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
-            boolean stat = true;
-            if ((c & SUBSCRIBE) != 0) {
-                if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
-                    try {
-                        if (s != null)
-                            s.onSubscribe(this);
-                    } catch (Throwable ex) {
-                        onError(ex);
-                    }
+        final int takeItems(Subscriber<? super T> s, long d, int h) {
+            Object[] a;
+            int k = 0, cap;
+            if ((a = array) != null && (cap = a.length) > 0) {
+                int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
+                int n = (d < (long)b) ? (int)d : b;
+                for (; k < n; ++h, ++k) {
+                    Object x = QA.getAndSet(a, h & m, null);
+                    if (waiting != 0)
+                        signalWaiter();
+                    if (x == null)
+                        break;
+                    else if (!consumeNext(s, x))
+                        break;
                 }
             }
-            else if ((c & ERROR) != 0) {
-                Throwable ex = pendingError;
-                ctl = DISABLED;           // no need for CAS
-                if (ex != null) {         // null if errorless cancel
-                    try {
-                        if (s != null)
-                            s.onError(ex);
-                    } catch (Throwable ignore) {
-                    }
-                }
-            }
-            else {
-                detach();
-                stat = false;
-            }
-            return stat;
+            return k;
         }
 
-        /**
-         * Responds to apparent emptiness in consume().
-         */
-        private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
-            boolean stat = true;
-            if (head == tail) {
-                if ((c & CONSUME) != 0)
-                    CTL.compareAndSet(this, c, c & ~CONSUME);
-                else if ((c & COMPLETE) != 0) {
-                    if (CTL.compareAndSet(this, c, DISABLED)) {
-                        try {
-                            if (s != null)
-                                s.onComplete();
-                        } catch (Throwable ignore) {
-                        }
-                    }
-                }
-                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
-                    stat = false;
+        final boolean consumeNext(Subscriber<? super T> s, Object x) {
+            try {
+                @SuppressWarnings("unchecked") T y = (T) x;
+                if (s != null)
+                    s.onNext(y);
+                return true;
+            } catch (Throwable ex) {
+                handleOnNext(s, ex);
+                return false;
             }
-            return stat;
-        }
-
-        /**
-         * Responds to apparent zero demand in consume().
-         */
-        private boolean checkDemand(int c) {
-            boolean stat = true;
-            if (demand == 0L) {
-                if ((c & CONSUME) != 0)
-                    CTL.compareAndSet(this, c, c & ~CONSUME);
-                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
-                    stat = false;
-            }
-            return stat;
         }
 
         /**
          * Processes exception in Subscriber.onNext.
          */
-        private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
-            BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
-            if ((h = onNextHandler) != null) {
-                try {
+        final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
+            BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
+            try {
+                if ((h = onNextHandler) != null)
                     h.accept(s, ex);
-                } catch (Throwable ignore) {
+            } catch (Throwable ignore) {
+            }
+            closeOnError(s, ex);
+        }
+
+        /**
+         * Issues subscriber.onSubscribe if this is first signal.
+         */
+        final void subscribeOnOpen(Subscriber<? super T> s) {
+            if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
+                consumeSubscribe(s);
+        }
+
+        final void consumeSubscribe(Subscriber<? super T> s) {
+            try {
+                if (s != null) // ignore if disabled
+                    s.onSubscribe(this);
+            } catch (Throwable ex) {
+                closeOnError(s, ex);
+            }
+        }
+
+        /**
+         * Issues subscriber.onComplete unless already closed.
+         */
+        final void closeOnComplete(Subscriber<? super T> s) {
+            if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
+                consumeComplete(s);
+        }
+
+        final void consumeComplete(Subscriber<? super T> s) {
+            try {
+                if (s != null)
+                    s.onComplete();
+            } catch (Throwable ignore) {
+            }
+        }
+
+        /**
+         * Issues subscriber.onError, and unblocks producer if needed.
+         */
+        final void closeOnError(Subscriber<? super T> s, Throwable ex) {
+            if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
+                if (ex == null)
+                    ex = pendingError;
+                pendingError = null;  // detach
+                executor = null;      // suppress racing start calls
+                signalWaiter();
+                consumeError(s, ex);
+            }
+        }
+
+        final void consumeError(Subscriber<? super T> s, Throwable ex) {
+            try {
+                if (ex != null && s != null)
+                    s.onError(ex);
+            } catch (Throwable ignore) {
+            }
+        }
+
+        // Blocking support
+
+        /**
+         * Unblocks waiting producer.
+         */
+        final void signalWaiter() {
+            Thread w;
+            waiting = 0;
+            if ((w = waiter) != null)
+                LockSupport.unpark(w);
+        }
+
+        /**
+         * Returns true if closed or space available.
+         * For ManagedBlocker.
+         */
+        public final boolean isReleasable() {
+            Object[] a; int cap;
+            return ((ctl & CLOSED) != 0 ||
+                    ((a = array) != null && (cap = a.length) > 0 &&
+                     QA.getAcquire(a, (cap - 1) & tail) == null));
+        }
+
+        /**
+         * Helps or blocks until timeout, closed, or space available.
+         */
+        final void awaitSpace(long nanos) {
+            if (!isReleasable()) {
+                ForkJoinPool.helpAsyncBlocker(executor, this);
+                if (!isReleasable()) {
+                    timeout = nanos;
+                    try {
+                        ForkJoinPool.managedBlock(this);
+                    } catch (InterruptedException ie) {
+                        timeout = INTERRUPTED;
+                    }
+                    if (timeout == INTERRUPTED)
+                        Thread.currentThread().interrupt();
                 }
             }
-            onError(ex);
+        }
+
+        /**
+         * Blocks until closed, space available or timeout.
+         * For ManagedBlocker.
+         */
+        public final boolean block() {
+            long nanos = timeout;
+            boolean timed = (nanos < Long.MAX_VALUE);
+            long deadline = timed ? System.nanoTime() + nanos : 0L;
+            while (!isReleasable()) {
+                if (Thread.interrupted()) {
+                    timeout = INTERRUPTED;
+                    if (timed)
+                        break;
+                }
+                else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
+                    break;
+                else if (waiter == null)
+                    waiter = Thread.currentThread();
+                else if (waiting == 0)
+                    waiting = 1;
+                else if (timed)
+                    LockSupport.parkNanos(this, nanos);
+                else
+                    LockSupport.park(this);
+            }
+            waiter = null;
+            waiting = 0;
+            return true;
         }
 
         // VarHandle mechanics
-        private static final VarHandle CTL;
-        private static final VarHandle TAIL;
-        private static final VarHandle HEAD;
-        private static final VarHandle DEMAND;
-        private static final VarHandle QA;
+        static final VarHandle CTL;
+        static final VarHandle DEMAND;
+        static final VarHandle QA;
 
         static {
             try {
                 MethodHandles.Lookup l = MethodHandles.lookup();
                 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
                                       int.class);
-                TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
-                                       int.class);
-                HEAD = l.findVarHandle(BufferedSubscription.class, "head",
-                                       int.class);
                 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
                                          long.class);
                 QA = MethodHandles.arrayElementVarHandle(Object[].class);
--- a/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java	Sat Dec 02 10:03:41 2017 -0800
+++ b/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java	Sat Dec 02 10:08:55 2017 -0800
@@ -33,6 +33,7 @@
  */
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
@@ -429,7 +430,8 @@
      * Cancelling a subscription eventually causes no more onNexts to be issued
      */
     public void testCancel() {
-        SubmissionPublisher<Integer> p = basicPublisher();
+        SubmissionPublisher<Integer> p =
+            new SubmissionPublisher<Integer>(basicExecutor, 4); // must be < 20
         TestSubscriber s1 = new TestSubscriber();
         TestSubscriber s2 = new TestSubscriber();
         p.subscribe(s1);
@@ -666,7 +668,6 @@
         p.subscribe(s1);
         p.subscribe(s2);
         for (int i = 1; i <= 20; ++i) {
-            assertTrue(p.estimateMinimumDemand() <= 1);
             assertTrue(p.submit(i) >= 0);
         }
         p.close();
@@ -1005,4 +1006,31 @@
         assertTrue(count.get() < n);
     }
 
+    /**
+     * Tests scenario for
+     * JDK-8187947: A race condition in SubmissionPublisher
+     * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
+     */
+    public void testMissedSignal_8187947() throws Exception {
+        final int N = expensiveTests ? (1 << 20) : (1 << 10);
+        final CountDownLatch finished = new CountDownLatch(1);
+        final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
+        class Sub implements Subscriber<Boolean> {
+            int received;
+            public void onSubscribe(Subscription s) {
+                s.request(N);
+            }
+            public void onNext(Boolean item) {
+                if (++received == N)
+                    finished.countDown();
+                else
+                    CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
+            }
+            public void onError(Throwable t) { throw new AssertionError(t); }
+            public void onComplete() {}
+        }
+        pub.subscribe(new Sub());
+        CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
+        await(finished);
+    }
 }