8200520: forkjoin tasks interrupted after shutdown
authordl
Tue, 10 Apr 2018 11:29:37 -0700
changeset 49563 79d2c9da2c26
parent 49562 82d3fa5303e0
child 49564 260bf39376a4
8200520: forkjoin tasks interrupted after shutdown Reviewed-by: martin, psandoz, chegar, dholmes
src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java	Tue Apr 10 11:25:46 2018 -0700
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java	Tue Apr 10 11:29:37 2018 -0700
@@ -184,17 +184,22 @@
      * functionality and control for a set of worker threads:
      * Submissions from non-FJ threads enter into submission queues.
      * Workers take these tasks and typically split them into subtasks
-     * that may be stolen by other workers.  Preference rules give
-     * first priority to processing tasks from their own queues (LIFO
-     * or FIFO, depending on mode), then to randomized FIFO steals of
-     * tasks in other queues.  This framework began as vehicle for
-     * supporting tree-structured parallelism using work-stealing.
-     * Over time, its scalability advantages led to extensions and
-     * changes to better support more diverse usage contexts.  Because
-     * most internal methods and nested classes are interrelated,
-     * their main rationale and descriptions are presented here;
-     * individual methods and nested classes contain only brief
-     * comments about details.
+     * that may be stolen by other workers. Work-stealing based on
+     * randomized scans generally leads to better throughput than
+     * "work dealing" in which producers assign tasks to idle threads,
+     * in part because threads that have finished other tasks before
+     * the signalled thread wakes up (which can be a long time) can
+     * take the task instead.  Preference rules give first priority to
+     * processing tasks from their own queues (LIFO or FIFO, depending
+     * on mode), then to randomized FIFO steals of tasks in other
+     * queues.  This framework began as vehicle for supporting
+     * tree-structured parallelism using work-stealing.  Over time,
+     * its scalability advantages led to extensions and changes to
+     * better support more diverse usage contexts.  Because most
+     * internal methods and nested classes are interrelated, their
+     * main rationale and descriptions are presented here; individual
+     * methods and nested classes contain only brief comments about
+     * details.
      *
      * WorkQueues
      * ==========
@@ -227,9 +232,10 @@
      *
      * (The actual code needs to null-check and size-check the array,
      * uses masking, not mod, for indexing a power-of-two-sized array,
-     * properly fences accesses, and possibly signals waiting workers
-     * to start scanning -- see below.)  Both a successful pop and
-     * poll mainly entail a CAS of a slot from non-null to null.
+     * adds a release fence for publication, and possibly signals
+     * waiting workers to start scanning -- see below.)  Both a
+     * successful pop and poll mainly entail a CAS of a slot from
+     * non-null to null.
      *
      * The pop operation (always performed by owner) is:
      *   if ((the task at top slot is not null) and
@@ -241,9 +247,14 @@
      *        (CAS slot to null))
      *           increment base and return task;
      *
-     * There are several variants of each of these. In particular,
-     * almost all uses of poll occur within scan operations that also
-     * interleave contention tracking (with associated code sprawl.)
+     * There are several variants of each of these. Most uses occur
+     * within operations that also interleave contention or emptiness
+     * tracking or inspection of elements before extracting them, so
+     * must interleave these with the above code. When performed by
+     * owner, getAndSet is used instead of CAS (see for example method
+     * nextLocalTask) which is usually more efficient, and possible
+     * because the top index cannot independently change during the
+     * operation.
      *
      * Memory ordering.  See "Correct and Efficient Work-Stealing for
      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
@@ -252,30 +263,37 @@
      * algorithms similar to (but different than) the one used here.
      * Extracting tasks in array slots via (fully fenced) CAS provides
      * primary synchronization. The base and top indices imprecisely
-     * guide where to extract from. We do not always require strict
-     * orderings of array and index updates, so sometimes let them be
-     * subject to compiler and processor reorderings. However, the
-     * volatile "base" index also serves as a basis for memory
-     * ordering: Slot accesses are preceded by a read of base,
-     * ensuring happens-before ordering with respect to stealers (so
-     * the slots themselves can be read via plain array reads.)  The
-     * only other memory orderings relied on are maintained in the
-     * course of signalling and activation (see below).  A check that
-     * base == top indicates (momentary) emptiness, but otherwise may
-     * err on the side of possibly making the queue appear nonempty
-     * when a push, pop, or poll have not fully committed, or making
-     * it appear empty when an update of top has not yet been visibly
-     * written.  (Method isEmpty() checks the case of a partially
-     * completed removal of the last element.)  Because of this, the
-     * poll operation, considered individually, is not wait-free. One
-     * thief cannot successfully continue until another in-progress
-     * one (or, if previously empty, a push) visibly completes.
-     * However, in the aggregate, we ensure at least probabilistic
+     * guide where to extract from. We do not usually require strict
+     * orderings of array and index updates. Many index accesses use
+     * plain mode, with ordering constrained by surrounding context
+     * (usually with respect to element CASes or the two WorkQueue
+     * volatile fields source and phase). When not otherwise already
+     * constrained, reads of "base" by queue owners use acquire-mode,
+     * and some externally callable methods preface accesses with
+     * acquire fences.  Additionally, to ensure that index update
+     * writes are not coalesced or postponed in loops etc, "opaque"
+     * mode is used in a few cases where timely writes are not
+     * otherwise ensured. The "locked" versions of push- and pop-
+     * based methods for shared queues differ from owned versions
+     * because locking already forces some of the ordering.
+     *
+     * Because indices and slot contents cannot always be consistent,
+     * a check that base == top indicates (momentary) emptiness, but
+     * otherwise may err on the side of possibly making the queue
+     * appear nonempty when a push, pop, or poll have not fully
+     * committed, or making it appear empty when an update of top has
+     * not yet been visibly written.  (Method isEmpty() checks the
+     * case of a partially completed removal of the last element.)
+     * Because of this, the poll operation, considered individually,
+     * is not wait-free. One thief cannot successfully continue until
+     * another in-progress one (or, if previously empty, a push)
+     * visibly completes.  This can stall threads when required to
+     * consume from a given queue (see method poll()).  However, in
+     * the aggregate, we ensure at least probabilistic
      * non-blockingness.  If an attempted steal fails, a scanning
      * thief chooses a different random victim target to try next. So,
      * in order for one thief to progress, it suffices for any
-     * in-progress poll or new push on any empty queue to
-     * complete.
+     * in-progress poll or new push on any empty queue to complete.
      *
      * This approach also enables support of a user mode in which
      * local task processing is in FIFO, not LIFO order, simply by
@@ -296,7 +314,7 @@
      * different position to use or create other queues -- they block
      * only when creating and registering new queues. Because it is
      * used only as a spinlock, unlocking requires only a "releasing"
-     * store (using setRelease).
+     * store (using setRelease) unless otherwise signalling.
      *
      * Management
      * ==========
@@ -317,10 +335,10 @@
      *
      * Field "ctl" contains 64 bits holding information needed to
      * atomically decide to add, enqueue (on an event queue), and
-     * dequeue (and release)-activate workers.  To enable this
-     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
-     * far in excess of normal operating range) to allow ids, counts,
-     * and their negations (used for thresholding) to fit into 16bit
+     * dequeue and release workers.  To enable this packing, we
+     * restrict maximum parallelism to (1<<15)-1 (which is far in
+     * excess of normal operating range) to allow ids, counts, and
+     * their negations (used for thresholding) to fit into 16bit
      * subfields.
      *
      * Field "mode" holds configuration parameters as well as lifetime
@@ -332,13 +350,14 @@
      * lock (using field workerNamePrefix as lock), but is otherwise
      * concurrently readable, and accessed directly. We also ensure
      * that uses of the array reference itself never become too stale
-     * in case of resizing.  To simplify index-based operations, the
-     * array size is always a power of two, and all readers must
-     * tolerate null slots. Worker queues are at odd indices. Shared
-     * (submission) queues are at even indices, up to a maximum of 64
-     * slots, to limit growth even if array needs to expand to add
-     * more workers. Grouping them together in this way simplifies and
-     * speeds up task scanning.
+     * in case of resizing, by arranging that (re-)reads are separated
+     * by at least one acquiring read access.  To simplify index-based
+     * operations, the array size is always a power of two, and all
+     * readers must tolerate null slots. Worker queues are at odd
+     * indices. Shared (submission) queues are at even indices, up to
+     * a maximum of 64 slots, to limit growth even if the array needs
+     * to expand to add more workers. Grouping them together in this
+     * way simplifies and speeds up task scanning.
      *
      * All worker thread creation is on-demand, triggered by task
      * submissions, replacement of terminated workers, and/or
@@ -416,8 +435,8 @@
      * releases so usage requires care -- seeing a negative phase does
      * not guarantee that the worker is available. When queued, the
      * lower 16 bits of scanState must hold its pool index. So we
-     * place the index there upon initialization (see registerWorker)
-     * and otherwise keep it there or restore it when necessary.
+     * place the index there upon initialization and otherwise keep it
+     * there or restore it when necessary.
      *
      * The ctl field also serves as the basis for memory
      * synchronization surrounding activation. This uses a more
@@ -425,48 +444,56 @@
      * consumers sync with each other by both writing/CASing ctl (even
      * if to its current value).  This would be extremely costly. So
      * we relax it in several ways: (1) Producers only signal when
-     * their queue is empty. Other workers propagate this signal (in
-     * method scan) when they find tasks; to further reduce flailing,
-     * each worker signals only one other per activation. (2) Workers
-     * only enqueue after scanning (see below) and not finding any
-     * tasks.  (3) Rather than CASing ctl to its current value in the
-     * common case where no action is required, we reduce write
+     * their queue is possibly empty at some point during a push
+     * operation (which requires conservatively checking size zero or
+     * one to cover races). (2) Other workers propagate this signal
+     * when they find tasks in a queue with size greater than one. (3)
+     * Workers only enqueue after scanning (see below) and not finding
+     * any tasks.  (4) Rather than CASing ctl to its current value in
+     * the common case where no action is required, we reduce write
      * contention by equivalently prefacing signalWork when called by
      * an external task producer using a memory access with
      * full-volatile semantics or a "fullFence".
      *
-     * Almost always, too many signals are issued. A task producer
-     * cannot in general tell if some existing worker is in the midst
-     * of finishing one task (or already scanning) and ready to take
-     * another without being signalled. So the producer might instead
-     * activate a different worker that does not find any work, and
-     * then inactivates. This scarcely matters in steady-state
-     * computations involving all workers, but can create contention
-     * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
-     * computations involving only a few workers.
+     * Almost always, too many signals are issued, in part because a
+     * task producer cannot tell if some existing worker is in the
+     * midst of finishing one task (or already scanning) and ready to
+     * take another without being signalled. So the producer might
+     * instead activate a different worker that does not find any
+     * work, and then inactivates. This scarcely matters in
+     * steady-state computations involving all workers, but can create
+     * contention and bookkeeping bottlenecks during ramp-up,
+     * ramp-down, and small computations involving only a few workers.
      *
-     * Scanning. Method runWorker performs top-level scanning for
-     * tasks.  Each scan traverses and tries to poll from each queue
-     * starting at a random index and circularly stepping. Scans are
-     * not performed in ideal random permutation order, to reduce
-     * cacheline contention.  The pseudorandom generator need not have
+     * Scanning. Method scan (from runWorker) performs top-level
+     * scanning for tasks. (Similar scans appear in helpQuiesce and
+     * pollScan.)  Each scan traverses and tries to poll from each
+     * queue starting at a random index. Scans are not performed in
+     * ideal random permutation order, to reduce cacheline
+     * contention. The pseudorandom generator need not have
      * high-quality statistical properties in the long term, but just
      * within computations; We use Marsaglia XorShifts (often via
      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
-     * suffice. Scanning also employs contention reduction: When
+     * suffice. Scanning also includes contention reduction: When
      * scanning workers fail to extract an apparently existing task,
-     * they soon restart at a different pseudorandom index.  This
-     * improves throughput when many threads are trying to take tasks
-     * from few queues, which can be common in some usages.  Scans do
-     * not otherwise explicitly take into account core affinities,
-     * loads, cache localities, etc, However, they do exploit temporal
-     * locality (which usually approximates these) by preferring to
-     * re-poll (at most #workers times) from the same queue after a
-     * successful poll before trying others.
+     * they soon restart at a different pseudorandom index.  This form
+     * of backoff improves throughput when many threads are trying to
+     * take tasks from few queues, which can be common in some usages.
+     * Scans do not otherwise explicitly take into account core
+     * affinities, loads, cache localities, etc, However, they do
+     * exploit temporal locality (which usually approximates these) by
+     * preferring to re-poll from the same queue after a successful
+     * poll before trying others (see method topLevelExec). However
+     * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
+     * against infinitely unfair looping under unbounded user task
+     * recursion, and also to reduce long-term contention when many
+     * threads poll few queues holding many small tasks. The bound is
+     * high enough to avoid much impact on locality and scheduling
+     * overhead.
      *
      * Trimming workers. To release resources after periods of lack of
      * use, a worker starting to wait when the pool is quiescent will
-     * time out and terminate (see method scan) if the pool has
+     * time out and terminate (see method runWorker) if the pool has
      * remained quiescent for period given by field keepAlive.
      *
      * Shutdown and Termination. A call to shutdownNow invokes
@@ -534,13 +561,14 @@
      * time. Some previous versions of this class employed immediate
      * compensations for any blocked join. However, in practice, the
      * vast majority of blockages are transient byproducts of GC and
-     * other JVM or OS activities that are made worse by replacement.
-     * Rather than impose arbitrary policies, we allow users to
-     * override the default of only adding threads upon apparent
-     * starvation.  The compensation mechanism may also be bounded.
-     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
-     * JVMs to cope with programming errors and abuse before running
-     * out of resources to do so.
+     * other JVM or OS activities that are made worse by replacement
+     * when they cause longer-term oversubscription.  Rather than
+     * impose arbitrary policies, we allow users to override the
+     * default of only adding threads upon apparent starvation.  The
+     * compensation mechanism may also be bounded.  Bounds for the
+     * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
+     * with programming errors and abuse before running out of
+     * resources to do so.
      *
      * Common Pool
      * ===========
@@ -573,6 +601,18 @@
      * in ForkJoinWorkerThread) may be JVM-dependent and must access
      * particular Thread class fields to achieve this effect.
      *
+     * Memory placement
+     * ================
+     *
+     * Performance can be very sensitive to placement of instances of
+     * ForkJoinPool and WorkQueues and their queue arrays. To reduce
+     * false-sharing impact, the @Contended annotation isolates
+     * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
+     * field. WorkQueue arrays are allocated (by their threads) with
+     * larger initial sizes than most ever need, mostly to reduce
+     * false sharing with current garbage collectors that use cardmark
+     * tables.
+     *
      * Style notes
      * ===========
      *
@@ -580,13 +620,15 @@
      * awkward and ugly, but also reflects the need to control
      * outcomes across the unusual cases that arise in very racy code
      * with very few invariants. All fields are read into locals
-     * before use, and null-checked if they are references.  This is
-     * usually done in a "C"-like style of listing declarations at the
-     * heads of methods or blocks, and using inline assignments on
-     * first encounter.  Nearly all explicit checks lead to
-     * bypass/return, not exception throws, because they may
-     * legitimately arise due to cancellation/revocation during
-     * shutdown.
+     * before use, and null-checked if they are references.  Array
+     * accesses using masked indices include checks (that are always
+     * true) that the array length is non-zero to avoid compilers
+     * inserting more expensive traps.  This is usually done in a
+     * "C"-like style of listing declarations at the heads of methods
+     * or blocks, and using inline assignments on first encounter.
+     * Nearly all explicit checks lead to bypass/return, not exception
+     * throws, because they may legitimately arise due to
+     * cancellation/revocation during shutdown.
      *
      * There is a lot of representation-level coupling among classes
      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
@@ -596,10 +638,11 @@
      * representations will need to be accompanied by algorithmic
      * changes anyway. Several methods intrinsically sprawl because
      * they must accumulate sets of consistent reads of fields held in
-     * local variables.  There are also other coding oddities
-     * (including several unnecessary-looking hoisted null checks)
-     * that help some methods perform reasonably even when interpreted
-     * (not compiled).
+     * local variables. Some others are artificially broken up to
+     * reduce producer/consumer imbalances due to dynamic compilation.
+     * There are also other coding oddities (including several
+     * unnecessary-looking hoisted null checks) that help some methods
+     * perform reasonably even when interpreted (not compiled).
      *
      * The order of declarations in this file is (with a few exceptions):
      * (1) Static utility functions
@@ -703,54 +746,43 @@
     static final int DORMANT      = QUIET | UNSIGNALLED;
 
     /**
-     * The maximum number of local polls from the same queue before
-     * checking others. This is a safeguard against infinitely unfair
-     * looping under unbounded user task recursion, and must be larger
-     * than plausible cases of intentional bounded task recursion.
+     * Initial capacity of work-stealing queue array.
+     * Must be a power of two, at least 2.
      */
-    static final int POLL_LIMIT = 1 << 10;
+    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
+
+    /**
+     * Maximum capacity for queue arrays. Must be a power of two less
+     * than or equal to 1 << (31 - width of array entry) to ensure
+     * lack of wraparound of index calculations, but defined to a
+     * value a bit less than this to help users trap runaway programs
+     * before saturating systems.
+     */
+    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
+
+    /**
+     * The maximum number of top-level polls per worker before
+     * checking other queues, expressed as a bit shift to, in effect,
+     * multiply by pool size, and then use as random value mask, so
+     * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
+     * above for rationale.
+     */
+    static final int TOP_BOUND_SHIFT = 10;
 
     /**
      * Queues supporting work-stealing as well as external task
      * submission. See above for descriptions and algorithms.
-     * Performance on most platforms is very sensitive to placement of
-     * instances of both WorkQueues and their arrays -- we absolutely
-     * do not want multiple WorkQueue instances or multiple queue
-     * arrays sharing cache lines. The @Contended annotation alerts
-     * JVMs to try to keep instances apart.
      */
     @jdk.internal.vm.annotation.Contended
     static final class WorkQueue {
-
-        /**
-         * Capacity of work-stealing queue array upon initialization.
-         * Must be a power of two; at least 4, but should be larger to
-         * reduce or eliminate cacheline sharing among queues.
-         * Currently, it is much larger, as a partial workaround for
-         * the fact that JVMs often place arrays in locations that
-         * share GC bookkeeping (especially cardmarks) such that
-         * per-write accesses encounter serious memory contention.
-         */
-        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
-
-        /**
-         * Maximum size for queue arrays. Must be a power of two less
-         * than or equal to 1 << (31 - width of array entry) to ensure
-         * lack of wraparound of index calculations, but defined to a
-         * value a bit less than this to help users trap runaway
-         * programs before saturating systems.
-         */
-        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
-
-        // Instance fields
+        volatile int source;       // source queue id, or sentinel
+        int id;                    // pool index, mode, tag
+        int base;                  // index of next slot for poll
+        int top;                   // index of next slot for push
         volatile int phase;        // versioned, negative: queued, 1: locked
         int stackPred;             // pool stack (ctl) predecessor link
         int nsteals;               // number of steals
-        int id;                    // index, mode, tag
-        volatile int source;       // source queue id, or sentinel
-        volatile int base;         // index of next slot for poll
-        int top;                   // index of next slot for push
-        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
+        ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
         final ForkJoinPool pool;   // the containing pool (may be null)
         final ForkJoinWorkerThread owner; // owning thread or null if shared
 
@@ -762,6 +794,17 @@
         }
 
         /**
+         * Tries to lock shared queue by CASing phase field.
+         */
+        final boolean tryLockPhase() {
+            return PHASE.compareAndSet(this, 0, 1);
+        }
+
+        final void releasePhaseLock() {
+            PHASE.setRelease(this, 0);
+        }
+
+        /**
          * Returns an exportable index (used by ForkJoinWorkerThread).
          */
         final int getPoolIndex() {
@@ -772,7 +815,7 @@
          * Returns the approximate number of tasks in the queue.
          */
         final int queueSize() {
-            int n = base - top;       // read base first
+            int n = (int)BASE.getAcquire(this) - top;
             return (n >= 0) ? 0 : -n; // ignore transient negative
         }
 
@@ -782,14 +825,14 @@
          * near-empty queue has at least one unclaimed task.
          */
         final boolean isEmpty() {
-            ForkJoinTask<?>[] a; int n, al, b;
+            ForkJoinTask<?>[] a; int n, cap, b;
+            VarHandle.acquireFence(); // needed by external callers
             return ((n = (b = base) - top) >= 0 || // possibly one task
                     (n == -1 && ((a = array) == null ||
-                                 (al = a.length) == 0 ||
-                                 a[(al - 1) & b] == null)));
+                                 (cap = a.length) == 0 ||
+                                 a[(cap - 1) & b] == null)));
         }
 
-
         /**
          * Pushes a task. Call only by owner in unshared queues.
          *
@@ -797,94 +840,99 @@
          * @throws RejectedExecutionException if array cannot be resized
          */
         final void push(ForkJoinTask<?> task) {
-            int s = top; ForkJoinTask<?>[] a; int al, d;
-            if ((a = array) != null && (al = a.length) > 0) {
-                int index = (al - 1) & s;
-                ForkJoinPool p = pool;
+            ForkJoinTask<?>[] a;
+            int s = top, d, cap, m;
+            ForkJoinPool p = pool;
+            if ((a = array) != null && (cap = a.length) > 0) {
+                QA.setRelease(a, (m = cap - 1) & s, task);
                 top = s + 1;
-                QA.setRelease(a, index, task);
-                if ((d = base - s) == 0 && p != null) {
+                if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
+                    p != null) {                 // size 0 or 1
                     VarHandle.fullFence();
                     p.signalWork();
                 }
-                else if (d + al == 1)
-                    growArray();
+                else if (d == m)
+                    growArray(false);
             }
         }
 
         /**
-         * Initializes or doubles the capacity of array. Call either
-         * by owner or with lock held -- it is OK for base, but not
-         * top, to move while resizings are in progress.
+         * Version of push for shared queues. Call only with phase lock held.
+         * @return true if should signal work
          */
-        final ForkJoinTask<?>[] growArray() {
-            ForkJoinTask<?>[] oldA = array;
-            int oldSize = oldA != null ? oldA.length : 0;
-            int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
-            if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
-                throw new RejectedExecutionException("Queue capacity exceeded");
-            int oldMask, t, b;
-            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
-            if (oldA != null && (oldMask = oldSize - 1) > 0 &&
-                (t = top) - (b = base) > 0) {
-                int mask = size - 1;
-                do { // emulate poll from old array, push to new array
-                    int index = b & oldMask;
-                    ForkJoinTask<?> x = (ForkJoinTask<?>)
-                        QA.getAcquire(oldA, index);
-                    if (x != null &&
-                        QA.compareAndSet(oldA, index, x, null))
-                        a[b & mask] = x;
-                } while (++b != t);
-                VarHandle.releaseFence();
+        final boolean lockedPush(ForkJoinTask<?> task) {
+            ForkJoinTask<?>[] a;
+            boolean signal = false;
+            int s = top, b = base, cap, d;
+            if ((a = array) != null && (cap = a.length) > 0) {
+                a[(cap - 1) & s] = task;
+                top = s + 1;
+                if (b - s + cap - 1 == 0)
+                    growArray(true);
+                else {
+                    phase = 0; // full volatile unlock
+                    if (((s - base) & ~1) == 0) // size 0 or 1
+                        signal = true;
+                }
             }
-            return a;
+            return signal;
         }
 
         /**
-         * Takes next task, if one exists, in LIFO order.  Call only
-         * by owner in unshared queues.
+         * Doubles the capacity of array. Call either by owner or with
+         * lock held -- it is OK for base, but not top, to move while
+         * resizings are in progress.
          */
-        final ForkJoinTask<?> pop() {
-            int b = base, s = top, al, i; ForkJoinTask<?>[] a;
-            if ((a = array) != null && b != s && (al = a.length) > 0) {
-                int index = (al - 1) & --s;
-                ForkJoinTask<?> t = (ForkJoinTask<?>)
-                    QA.get(a, index);
-                if (t != null &&
-                    QA.compareAndSet(a, index, t, null)) {
-                    top = s;
-                    VarHandle.releaseFence();
-                    return t;
+        final void growArray(boolean locked) {
+            ForkJoinTask<?>[] newA = null;
+            try {
+                ForkJoinTask<?>[] oldA; int oldSize, newSize;
+                if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
+                    (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
+                    newSize > 0) {
+                    try {
+                        newA = new ForkJoinTask<?>[newSize];
+                    } catch (OutOfMemoryError ex) {
+                    }
+                    if (newA != null) { // poll from old array, push to new
+                        int oldMask = oldSize - 1, newMask = newSize - 1;
+                        for (int s = top - 1, k = oldMask; k >= 0; --k) {
+                            ForkJoinTask<?> x = (ForkJoinTask<?>)
+                                QA.getAndSet(oldA, s & oldMask, null);
+                            if (x != null)
+                                newA[s-- & newMask] = x;
+                            else
+                                break;
+                        }
+                        array = newA;
+                        VarHandle.releaseFence();
+                    }
                 }
+            } finally {
+                if (locked)
+                    phase = 0;
             }
-            return null;
+            if (newA == null)
+                throw new RejectedExecutionException("Queue capacity exceeded");
         }
 
         /**
          * Takes next task, if one exists, in FIFO order.
          */
         final ForkJoinTask<?> poll() {
-            for (;;) {
-                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
-                if ((a = array) != null && (d = b - s) < 0 &&
-                    (al = a.length) > 0) {
-                    int index = (al - 1) & b;
-                    ForkJoinTask<?> t = (ForkJoinTask<?>)
-                        QA.getAcquire(a, index);
-                    if (b++ == base) {
-                        if (t != null) {
-                            if (QA.compareAndSet(a, index, t, null)) {
-                                base = b;
-                                return t;
-                            }
-                        }
-                        else if (d == -1)
-                            break; // now empty
+            int b, k, cap; ForkJoinTask<?>[] a;
+            while ((a = array) != null && (cap = a.length) > 0 &&
+                   top - (b = base) > 0) {
+                ForkJoinTask<?> t = (ForkJoinTask<?>)
+                    QA.getAcquire(a, k = (cap - 1) & b);
+                if (base == b++) {
+                    if (t == null)
+                        Thread.yield(); // await index advance
+                    else if (QA.compareAndSet(a, k, t, null)) {
+                        BASE.setOpaque(this, b);
+                        return t;
                     }
                 }
-                else
-                    break;
             }
             return null;
         }
@@ -893,33 +941,61 @@
          * Takes next task, if one exists, in order specified by mode.
          */
         final ForkJoinTask<?> nextLocalTask() {
-            return ((id & FIFO) != 0) ? poll() : pop();
+            ForkJoinTask<?> t = null;
+            int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
+            if ((a = array) != null && (cap = a.length) > 0 &&
+                (d = (s = top) - (b = base)) > 0) {
+                if ((md & FIFO) == 0 || d == 1) {
+                    if ((t = (ForkJoinTask<?>)
+                         QA.getAndSet(a, (cap - 1) & --s, null)) != null)
+                        TOP.setOpaque(this, s);
+                }
+                else if ((t = (ForkJoinTask<?>)
+                          QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
+                    BASE.setOpaque(this, b);
+                }
+                else // on contention in FIFO mode, use regular poll
+                    t = poll();
+            }
+            return t;
         }
 
         /**
          * Returns next task, if one exists, in order specified by mode.
          */
         final ForkJoinTask<?> peek() {
-            int al; ForkJoinTask<?>[] a;
-            return ((a = array) != null && (al = a.length) > 0) ?
-                a[(al - 1) &
-                  ((id & FIFO) != 0 ? base : top - 1)] : null;
+            int cap; ForkJoinTask<?>[] a;
+            return ((a = array) != null && (cap = a.length) > 0) ?
+                a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
         }
 
         /**
          * Pops the given task only if it is at the current top.
          */
         final boolean tryUnpush(ForkJoinTask<?> task) {
-            int b = base, s = top, al; ForkJoinTask<?>[] a;
-            if ((a = array) != null && b != s && (al = a.length) > 0) {
-                int index = (al - 1) & --s;
-                if (QA.compareAndSet(a, index, task, null)) {
+            boolean popped = false;
+            int s, cap; ForkJoinTask<?>[] a;
+            if ((a = array) != null && (cap = a.length) > 0 &&
+                (s = top) != base &&
+                (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
+                TOP.setOpaque(this, s);
+            return popped;
+        }
+
+        /**
+         * Shared version of tryUnpush.
+         */
+        final boolean tryLockedUnpush(ForkJoinTask<?> task) {
+            boolean popped = false;
+            int s = top - 1, k, cap; ForkJoinTask<?>[] a;
+            if ((a = array) != null && (cap = a.length) > 0 &&
+                a[k = (cap - 1) & s] == task && tryLockPhase()) {
+                if (top == s + 1 && array == a &&
+                    (popped = QA.compareAndSet(a, k, task, null)))
                     top = s;
-                    VarHandle.releaseFence();
-                    return true;
-                }
+                releasePhaseLock();
             }
-            return false;
+            return popped;
         }
 
         /**
@@ -933,58 +1009,29 @@
         // Specialized execution methods
 
         /**
-         * Pops and executes up to limit consecutive tasks or until empty.
-         *
-         * @param limit max runs, or zero for no limit
+         * Runs the given (stolen) task if nonnull, as well as
+         * remaining local tasks and others available from the given
+         * queue, up to bound n (to avoid infinite unfairness).
          */
-        final void localPopAndExec(int limit) {
-            for (;;) {
-                int b = base, s = top, al; ForkJoinTask<?>[] a;
-                if ((a = array) != null && b != s && (al = a.length) > 0) {
-                    int index = (al - 1) & --s;
-                    ForkJoinTask<?> t = (ForkJoinTask<?>)
-                        QA.getAndSet(a, index, null);
-                    if (t != null) {
-                        top = s;
-                        VarHandle.releaseFence();
-                        t.doExec();
-                        if (limit != 0 && --limit == 0)
+        final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
+            if (t != null && q != null) { // hoist checks
+                int nstolen = 1;
+                for (;;) {
+                    t.doExec();
+                    if (n-- < 0)
+                        break;
+                    else if ((t = nextLocalTask()) == null) {
+                        if ((t = q.poll()) == null)
                             break;
+                        else
+                            ++nstolen;
                     }
-                    else
-                        break;
                 }
-                else
-                    break;
-            }
-        }
-
-        /**
-         * Polls and executes up to limit consecutive tasks or until empty.
-         *
-         * @param limit, or zero for no limit
-         */
-        final void localPollAndExec(int limit) {
-            for (int polls = 0;;) {
-                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
-                if ((a = array) != null && (d = b - s) < 0 &&
-                    (al = a.length) > 0) {
-                    int index = (al - 1) & b++;
-                    ForkJoinTask<?> t = (ForkJoinTask<?>)
-                        QA.getAndSet(a, index, null);
-                    if (t != null) {
-                        base = b;
-                        t.doExec();
-                        if (limit != 0 && ++polls == limit)
-                            break;
-                    }
-                    else if (d == -1)
-                        break;     // now empty
-                    else
-                        polls = 0; // stolen; reset
-                }
-                else
-                    break;
+                ForkJoinWorkerThread thread = owner;
+                nsteals += nstolen;
+                source = 0;
+                if (thread != null)
+                    thread.afterTopLevelExec();
             }
         }
 
@@ -992,25 +1039,24 @@
          * If present, removes task from queue and executes it.
          */
         final void tryRemoveAndExec(ForkJoinTask<?> task) {
-            ForkJoinTask<?>[] wa; int s, wal;
-            if (base - (s = top) < 0 && // traverse from top
-                (wa = array) != null && (wal = wa.length) > 0) {
-                for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
+            ForkJoinTask<?>[] a; int s, cap;
+            if ((a = array) != null && (cap = a.length) > 0 &&
+                (s = top) - base > 0) { // traverse from top
+                for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
                     int index = i & m;
-                    ForkJoinTask<?> t = (ForkJoinTask<?>)
-                        QA.get(wa, index);
+                    ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
                     if (t == null)
                         break;
                     else if (t == task) {
-                        if (QA.compareAndSet(wa, index, t, null)) {
+                        if (QA.compareAndSet(a, index, t, null)) {
                             top = ns;   // safely shift down
                             for (int j = i; j != ns; ++j) {
                                 ForkJoinTask<?> f;
                                 int pindex = (j + 1) & m;
-                                f = (ForkJoinTask<?>)QA.get(wa, pindex);
-                                QA.setVolatile(wa, pindex, null);
+                                f = (ForkJoinTask<?>)QA.get(a, pindex);
+                                QA.setVolatile(a, pindex, null);
                                 int jindex = j & m;
-                                QA.setRelease(wa, jindex, f);
+                                QA.setRelease(a, jindex, f);
                             }
                             VarHandle.releaseFence();
                             t.doExec();
@@ -1022,43 +1068,52 @@
         }
 
         /**
-         * Tries to steal and run tasks within the target's
-         * computation until done, not found, or limit exceeded.
+         * Tries to pop and run tasks within the target's computation
+         * until done, not found, or limit exceeded.
          *
          * @param task root of CountedCompleter computation
          * @param limit max runs, or zero for no limit
+         * @param shared true if must lock to extract task
          * @return task status on exit
          */
-        final int localHelpCC(CountedCompleter<?> task, int limit) {
+        final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
             int status = 0;
             if (task != null && (status = task.status) >= 0) {
-                for (;;) {
-                    boolean help = false;
-                    int b = base, s = top, al; ForkJoinTask<?>[] a;
-                    if ((a = array) != null && b != s && (al = a.length) > 0) {
-                        int index = (al - 1) & (s - 1);
-                        ForkJoinTask<?> o = (ForkJoinTask<?>)
-                            QA.get(a, index);
-                        if (o instanceof CountedCompleter) {
-                            CountedCompleter<?> t = (CountedCompleter<?>)o;
-                            for (CountedCompleter<?> f = t;;) {
-                                if (f != task) {
-                                    if ((f = f.completer) == null) // try parent
-                                        break;
+                int s, k, cap; ForkJoinTask<?>[] a;
+                while ((a = array) != null && (cap = a.length) > 0 &&
+                       (s = top) - base > 0) {
+                    CountedCompleter<?> v = null;
+                    ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
+                    if (o instanceof CountedCompleter) {
+                        CountedCompleter<?> t = (CountedCompleter<?>)o;
+                        for (CountedCompleter<?> f = t;;) {
+                            if (f != task) {
+                                if ((f = f.completer) == null)
+                                    break;
+                            }
+                            else if (shared) {
+                                if (tryLockPhase()) {
+                                    if (top == s && array == a &&
+                                        QA.compareAndSet(a, k, t, null)) {
+                                        top = s - 1;
+                                        v = t;
+                                    }
+                                    releasePhaseLock();
                                 }
-                                else {
-                                    if (QA.compareAndSet(a, index, t, null)) {
-                                        top = s - 1;
-                                        VarHandle.releaseFence();
-                                        t.doExec();
-                                        help = true;
-                                    }
-                                    break;
+                                break;
+                            }
+                            else {
+                                if (QA.compareAndSet(a, k, t, null)) {
+                                    top = s - 1;
+                                    v = t;
                                 }
+                                break;
                             }
                         }
                     }
-                    if ((status = task.status) < 0 || !help ||
+                    if (v != null)
+                        v.doExec();
+                    if ((status = task.status) < 0 || v == null ||
                         (limit != 0 && --limit == 0))
                         break;
                 }
@@ -1066,79 +1121,31 @@
             return status;
         }
 
-        // Operations on shared queues
-
         /**
-         * Tries to lock shared queue by CASing phase field.
-         */
-        final boolean tryLockSharedQueue() {
-            return PHASE.compareAndSet(this, 0, QLOCK);
-        }
-
-        /**
-         * Shared version of tryUnpush.
+         * Tries to poll and run AsynchronousCompletionTasks until
+         * none found or blocker is released
+         *
+         * @param blocker the blocker
          */
-        final boolean trySharedUnpush(ForkJoinTask<?> task) {
-            boolean popped = false;
-            int s = top - 1, al; ForkJoinTask<?>[] a;
-            if ((a = array) != null && (al = a.length) > 0) {
-                int index = (al - 1) & s;
-                ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
-                if (t == task &&
-                    PHASE.compareAndSet(this, 0, QLOCK)) {
-                    if (top == s + 1 && array == a &&
-                        QA.compareAndSet(a, index, task, null)) {
-                        popped = true;
-                        top = s;
+        final void helpAsyncBlocker(ManagedBlocker blocker) {
+            if (blocker != null) {
+                int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
+                while ((a = array) != null && (cap = a.length) > 0 &&
+                       top - (b = base) > 0) {
+                    t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
+                    if (blocker.isReleasable())
+                        break;
+                    else if (base == b++ && t != null) {
+                        if (!(t instanceof CompletableFuture.
+                              AsynchronousCompletionTask))
+                            break;
+                        else if (QA.compareAndSet(a, k, t, null)) {
+                            BASE.setOpaque(this, b);
+                            t.doExec();
+                        }
                     }
-                    PHASE.setRelease(this, 0);
                 }
             }
-            return popped;
-        }
-
-        /**
-         * Shared version of localHelpCC.
-         */
-        final int sharedHelpCC(CountedCompleter<?> task, int limit) {
-            int status = 0;
-            if (task != null && (status = task.status) >= 0) {
-                for (;;) {
-                    boolean help = false;
-                    int b = base, s = top, al; ForkJoinTask<?>[] a;
-                    if ((a = array) != null && b != s && (al = a.length) > 0) {
-                        int index = (al - 1) & (s - 1);
-                        ForkJoinTask<?> o = (ForkJoinTask<?>)
-                            QA.get(a, index);
-                        if (o instanceof CountedCompleter) {
-                            CountedCompleter<?> t = (CountedCompleter<?>)o;
-                            for (CountedCompleter<?> f = t;;) {
-                                if (f != task) {
-                                    if ((f = f.completer) == null)
-                                        break;
-                                }
-                                else {
-                                    if (PHASE.compareAndSet(this, 0, QLOCK)) {
-                                        if (top == s && array == a &&
-                                            QA.compareAndSet(a, index, t, null)) {
-                                            help = true;
-                                            top = s - 1;
-                                        }
-                                        PHASE.setRelease(this, 0);
-                                        if (help)
-                                            t.doExec();
-                                    }
-                                    break;
-                                }
-                            }
-                        }
-                    }
-                    if ((status = task.status) < 0 || !help ||
-                        (limit != 0 && --limit == 0))
-                        break;
-                }
-            }
-            return status;
         }
 
         /**
@@ -1153,13 +1160,17 @@
         }
 
         // VarHandle mechanics.
-        private static final VarHandle PHASE;
+        static final VarHandle PHASE;
+        static final VarHandle BASE;
+        static final VarHandle TOP;
         static {
             try {
                 MethodHandles.Lookup l = MethodHandles.lookup();
                 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
+                BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
+                TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
             } catch (ReflectiveOperationException e) {
-                throw new Error(e);
+                throw new ExceptionInInitializerError(e);
             }
         }
     }
@@ -1356,39 +1367,37 @@
         wt.setDaemon(true);                             // configure thread
         if ((handler = ueh) != null)
             wt.setUncaughtExceptionHandler(handler);
+        int tid = 0;                                    // for thread name
+        int idbits = mode & FIFO;
+        String prefix = workerNamePrefix;
         WorkQueue w = new WorkQueue(this, wt);
-        int tid = 0;                                    // for thread name
-        int fifo = mode & FIFO;
-        String prefix = workerNamePrefix;
         if (prefix != null) {
             synchronized (prefix) {
                 WorkQueue[] ws = workQueues; int n;
                 int s = indexSeed += SEED_INCREMENT;
+                idbits |= (s & ~(SMASK | FIFO | DORMANT));
                 if (ws != null && (n = ws.length) > 1) {
                     int m = n - 1;
-                    tid = s & m;
-                    int i = m & ((s << 1) | 1);         // odd-numbered indices
+                    tid = m & ((s << 1) | 1);           // odd-numbered indices
                     for (int probes = n >>> 1;;) {      // find empty slot
                         WorkQueue q;
-                        if ((q = ws[i]) == null || q.phase == QUIET)
+                        if ((q = ws[tid]) == null || q.phase == QUIET)
                             break;
                         else if (--probes == 0) {
-                            i = n | 1;                  // resize below
+                            tid = n | 1;                // resize below
                             break;
                         }
                         else
-                            i = (i + 2) & m;
+                            tid = (tid + 2) & m;
                     }
+                    w.phase = w.id = tid | idbits;      // now publishable
 
-                    int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
-                    w.phase = w.id = id;                // now publishable
-
-                    if (i < n)
-                        ws[i] = w;
+                    if (tid < n)
+                        ws[tid] = w;
                     else {                              // expand array
                         int an = n << 1;
                         WorkQueue[] as = new WorkQueue[an];
-                        as[i] = w;
+                        as[tid] = w;
                         int am = an - 1;
                         for (int j = 0; j < n; ++j) {
                             WorkQueue v;                // copy external queue
@@ -1421,14 +1430,14 @@
         int phase = 0;
         if (wt != null && (w = wt.workQueue) != null) {
             Object lock = workerNamePrefix;
+            int wid = w.id;
             long ns = (long)w.nsteals & 0xffffffffL;
-            int idx = w.id & SMASK;
             if (lock != null) {
-                WorkQueue[] ws;                       // remove index from array
                 synchronized (lock) {
-                    if ((ws = workQueues) != null && ws.length > idx &&
-                        ws[idx] == w)
-                        ws[idx] = null;
+                    WorkQueue[] ws; int n, i;         // remove index from array
+                    if ((ws = workQueues) != null && (n = ws.length) > 0 &&
+                        ws[i = wid & (n - 1)] == w)
+                        ws[i] = null;
                     stealCount += ns;
                 }
             }
@@ -1480,7 +1489,7 @@
                 Thread vt = v.owner;
                 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
                     v.phase = np;
-                    if (v.source < 0)
+                    if (vt != null && v.source < 0)
                         LockSupport.unpark(vt);
                     break;
                 }
@@ -1521,7 +1530,7 @@
                     long nc = ((long)v.stackPred & SP_MASK) | uc;
                     if (vp == sp && CTL.compareAndSet(this, c, nc)) {
                         v.phase = np;
-                        if (v.source < 0)
+                        if (vt != null && v.source < 0)
                             LockSupport.unpark(vt);
                         return (wp < 0) ? -1 : 1;
                     }
@@ -1578,101 +1587,88 @@
      * See above for explanation.
      */
     final void runWorker(WorkQueue w) {
-        WorkQueue[] ws;
-        w.growArray();                                  // allocate queue
-        int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
-        if (r == 0)                                     // initial nonzero seed
-            r = 1;
-        int lastSignalId = 0;                           // avoid unneeded signals
-        while ((ws = workQueues) != null) {
-            boolean nonempty = false;                   // scan
-            for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
-                WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
-                if ((i = r & m) >= 0 && i < n &&        // always true
-                    (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
-                    (a = q.array) != null && (al = a.length) > 0) {
-                    int qid = q.id;                     // (never zero)
-                    int index = (al - 1) & b;
-                    ForkJoinTask<?> t = (ForkJoinTask<?>)
-                        QA.getAcquire(a, index);
-                    if (t != null && b++ == q.base &&
-                        QA.compareAndSet(a, index, t, null)) {
-                        if ((q.base = b) - q.top < 0 && qid != lastSignalId)
-                            signalWork();               // propagate signal
-                        w.source = lastSignalId = qid;
-                        t.doExec();
-                        if ((w.id & FIFO) != 0)         // run remaining locals
-                            w.localPollAndExec(POLL_LIMIT);
-                        else
-                            w.localPopAndExec(POLL_LIMIT);
-                        ForkJoinWorkerThread thread = w.owner;
-                        ++w.nsteals;
-                        w.source = 0;                   // now idle
-                        if (thread != null)
-                            thread.afterTopLevelExec();
-                    }
-                    nonempty = true;
-                }
-                else if (nonempty)
-                    break;
-                else
-                    ++r;
+        int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
+        w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
+        for (;;) {
+            int phase;
+            if (scan(w, r)) {                     // scan until apparently empty
+                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
             }
-
-            if (nonempty) {                             // move (xorshift)
-                r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
+            else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
+                long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
+                long c, nc;
+                do {
+                    w.stackPred = (int)(c = ctl);
+                    nc = ((c - RC_UNIT) & UC_MASK) | np;
+                } while (!CTL.weakCompareAndSet(this, c, nc));
             }
-            else {
-                int phase;
-                lastSignalId = 0;                       // clear for next scan
-                if ((phase = w.phase) >= 0) {           // enqueue
-                    int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
-                    long c, nc;
-                    do {
-                        w.stackPred = (int)(c = ctl);
-                        nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
-                    } while (!CTL.weakCompareAndSet(this, c, nc));
-                }
-                else {                                  // already queued
-                    int pred = w.stackPred;
-                    w.source = DORMANT;                 // enable signal
-                    for (int steps = 0;;) {
-                        int md, rc; long c;
-                        if (w.phase >= 0) {
-                            w.source = 0;
-                            break;
-                        }
-                        else if ((md = mode) < 0)       // shutting down
-                            return;
-                        else if ((rc = ((md & SMASK) +  // possibly quiescent
-                                        (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
-                                 (md & SHUTDOWN) != 0 &&
-                                 tryTerminate(false, false))
-                            return;                     // help terminate
-                        else if ((++steps & 1) == 0)
-                            Thread.interrupted();       // clear between parks
-                        else if (rc <= 0 && pred != 0 && phase == (int)c) {
-                            long d = keepAlive + System.currentTimeMillis();
-                            LockSupport.parkUntil(this, d);
-                            if (ctl == c &&
-                                d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
-                                long nc = ((UC_MASK & (c - TC_UNIT)) |
-                                           (SP_MASK & pred));
-                                if (CTL.compareAndSet(this, c, nc)) {
-                                    w.phase = QUIET;
-                                    return;             // drop on timeout
-                                }
-                            }
-                        }
-                        else
-                            LockSupport.park(this);
+            else {                                // already queued
+                int pred = w.stackPred;
+                Thread.interrupted();             // clear before park
+                w.source = DORMANT;               // enable signal
+                long c = ctl;
+                int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
+                if (md < 0)                       // terminating
+                    break;
+                else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
+                         tryTerminate(false, false))
+                    break;                        // quiescent shutdown
+                else if (rc <= 0 && pred != 0 && phase == (int)c) {
+                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
+                    long d = keepAlive + System.currentTimeMillis();
+                    LockSupport.parkUntil(this, d);
+                    if (ctl == c &&               // drop on timeout if all idle
+                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
+                        CTL.compareAndSet(this, c, nc)) {
+                        w.phase = QUIET;
+                        break;
                     }
                 }
+                else if (w.phase < 0)
+                    LockSupport.park(this);       // OK if spuriously woken
+                w.source = 0;                     // disable signal
             }
         }
     }
 
     /**
+     * Scans for and if found executes one or more top-level tasks from a queue.
+     *
+     * @return true if found an apparently non-empty queue, and
+     * possibly ran task(s).
+     */
+    private boolean scan(WorkQueue w, int r) {
+        WorkQueue[] ws; int n;
+        if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
+            for (int m = n - 1, j = r & m;;) {
+                WorkQueue q; int b;
+                if ((q = ws[j]) != null && q.top != (b = q.base)) {
+                    int qid = q.id;
+                    ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
+                    if ((a = q.array) != null && (cap = a.length) > 0) {
+                        t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
+                        if (q.base == b++ && t != null &&
+                            QA.compareAndSet(a, k, t, null)) {
+                            q.base = b;
+                            w.source = qid;
+                            if (q.top - b > 0)
+                                signalWork();
+                            w.topLevelExec(t, q,  // random fairness bound
+                                           r & ((n << TOP_BOUND_SHIFT) - 1));
+                        }
+                    }
+                    return true;
+                }
+                else if (--n > 0)
+                    j = (j + 1) & m;
+                else
+                    break;
+            }
+        }
+        return false;
+    }
+
+    /**
      * Helps and/or blocks until the given task is done or timeout.
      * First tries locally helping, then scans other queues for a task
      * produced by one of w's stealers; compensating and blocking if
@@ -1685,42 +1681,44 @@
      */
     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
         int s = 0;
+        int seed = ThreadLocalRandom.nextSecondarySeed();
         if (w != null && task != null &&
             (!(task instanceof CountedCompleter) ||
-             (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
+             (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
             w.tryRemoveAndExec(task);
             int src = w.source, id = w.id;
+            int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
             s = task.status;
             while (s >= 0) {
                 WorkQueue[] ws;
-                boolean nonempty = false;
-                int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
-                if ((ws = workQueues) != null) {       // scan for matching id
-                    for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
-                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
-                        if ((i = (r + j) & m) >= 0 && i < n &&
-                            (q = ws[i]) != null && q.source == id &&
-                            (b = q.base) - q.top < 0 &&
-                            (a = q.array) != null && (al = a.length) > 0) {
-                            int qid = q.id;
-                            int index = (al - 1) & b;
+                int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
+                while (n > 0) {
+                    WorkQueue q; int b;
+                    if ((q = ws[r & m]) != null && q.source == id &&
+                        q.top != (b = q.base)) {
+                        ForkJoinTask<?>[] a; int cap, k;
+                        int qid = q.id;
+                        if ((a = q.array) != null && (cap = a.length) > 0) {
                             ForkJoinTask<?> t = (ForkJoinTask<?>)
-                                QA.getAcquire(a, index);
-                            if (t != null && b++ == q.base && id == q.source &&
-                                QA.compareAndSet(a, index, t, null)) {
+                                QA.getAcquire(a, k = (cap - 1) & b);
+                            if (q.source == id && q.base == b++ &&
+                                t != null && QA.compareAndSet(a, k, t, null)) {
                                 q.base = b;
                                 w.source = qid;
                                 t.doExec();
                                 w.source = src;
                             }
-                            nonempty = true;
-                            break;
                         }
+                        break;
+                    }
+                    else {
+                        r += step;
+                        --n;
                     }
                 }
                 if ((s = task.status) < 0)
                     break;
-                else if (!nonempty) {
+                else if (n == 0) { // empty scan
                     long ms, ns; int block;
                     if (deadline == 0L)
                         ms = 0L;                       // untimed
@@ -1745,44 +1743,44 @@
      * find tasks either.
      */
     final void helpQuiescePool(WorkQueue w) {
-        int prevSrc = w.source, fifo = w.id & FIFO;
+        int prevSrc = w.source;
+        int seed = ThreadLocalRandom.nextSecondarySeed();
+        int r = seed >>> 16, step = r | 1;
         for (int source = prevSrc, released = -1;;) { // -1 until known
-            WorkQueue[] ws;
-            if (fifo != 0)
-                w.localPollAndExec(0);
-            else
-                w.localPopAndExec(0);
-            if (released == -1 && w.phase >= 0)
+            ForkJoinTask<?> localTask; WorkQueue[] ws;
+            while ((localTask = w.nextLocalTask()) != null)
+                localTask.doExec();
+            if (w.phase >= 0 && released == -1)
                 released = 1;
             boolean quiet = true, empty = true;
-            int r = ThreadLocalRandom.nextSecondarySeed();
-            if ((ws = workQueues) != null) {
-                for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
-                    WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
-                    if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
-                        if ((b = q.base) - q.top < 0 &&
-                            (a = q.array) != null && (al = a.length) > 0) {
-                            int qid = q.id;
+            int n = (ws = workQueues) == null ? 0 : ws.length;
+            for (int m = n - 1; n > 0; r += step, --n) {
+                WorkQueue q; int b;
+                if ((q = ws[r & m]) != null) {
+                    int qs = q.source;
+                    if (q.top != (b = q.base)) {
+                        quiet = empty = false;
+                        ForkJoinTask<?>[] a; int cap, k;
+                        int qid = q.id;
+                        if ((a = q.array) != null && (cap = a.length) > 0) {
                             if (released == 0) {    // increment
                                 released = 1;
                                 CTL.getAndAdd(this, RC_UNIT);
                             }
-                            int index = (al - 1) & b;
                             ForkJoinTask<?> t = (ForkJoinTask<?>)
-                                QA.getAcquire(a, index);
-                            if (t != null && b++ == q.base &&
-                                QA.compareAndSet(a, index, t, null)) {
+                                QA.getAcquire(a, k = (cap - 1) & b);
+                            if (q.base == b++ && t != null &&
+                                QA.compareAndSet(a, k, t, null)) {
                                 q.base = b;
-                                w.source = source = q.id;
+                                w.source = qid;
                                 t.doExec();
                                 w.source = source = prevSrc;
                             }
-                            quiet = empty = false;
-                            break;
                         }
-                        else if ((q.source & QUIET) == 0)
-                            quiet = false;
+                        break;
                     }
+                    else if ((qs & QUIET) == 0)
+                        quiet = false;
                 }
             }
             if (quiet) {
@@ -1824,28 +1822,24 @@
                 origin = r & m;
                 step = h | 1;
             }
-            for (int k = origin, oldSum = 0, checkSum = 0;;) {
-                WorkQueue q; int b, al; ForkJoinTask<?>[] a;
-                if ((q = ws[k]) != null) {
-                    checkSum += b = q.base;
-                    if (b - q.top < 0 &&
-                        (a = q.array) != null && (al = a.length) > 0) {
-                        int index = (al - 1) & b;
-                        ForkJoinTask<?> t = (ForkJoinTask<?>)
-                            QA.getAcquire(a, index);
-                        if (t != null && b++ == q.base &&
-                            QA.compareAndSet(a, index, t, null)) {
-                            q.base = b;
+            boolean nonempty = false;
+            for (int i = origin, oldSum = 0, checkSum = 0;;) {
+                WorkQueue q;
+                if ((q = ws[i]) != null) {
+                    int b; ForkJoinTask<?> t;
+                    if (q.top - (b = q.base) > 0) {
+                        nonempty = true;
+                        if ((t = q.poll()) != null)
                             return t;
-                        }
-                        else
-                            break; // restart
                     }
+                    else
+                        checkSum += b + q.id;
                 }
-                if ((k = (k + step) & m) == origin) {
-                    if (oldSum == (oldSum = checkSum))
+                if ((i = (i + step) & m) == origin) {
+                    if (!nonempty && oldSum == (oldSum = checkSum))
                         break rescan;
                     checkSum = 0;
+                    nonempty = false;
                 }
             }
         }
@@ -1859,11 +1853,9 @@
      */
     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
         ForkJoinTask<?> t;
-        if (w != null &&
-            (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
-            return t;
-        else
-            return pollScan(false);
+        if (w == null || (t = w.nextLocalTask()) == null)
+            t = pollScan(false);
+        return t;
     }
 
     // External operations
@@ -1881,64 +1873,35 @@
             r = ThreadLocalRandom.getProbe();
         }
         for (;;) {
+            WorkQueue q;
             int md = mode, n;
             WorkQueue[] ws = workQueues;
             if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
                 throw new RejectedExecutionException();
-            else {
-                WorkQueue q;
-                boolean push = false, grow = false;
-                if ((q = ws[(n - 1) & r & SQMASK]) == null) {
-                    Object lock = workerNamePrefix;
-                    int qid = (r | QUIET) & ~(FIFO | OWNED);
-                    q = new WorkQueue(this, null);
-                    q.id = qid;
-                    q.source = QUIET;
-                    q.phase = QLOCK;          // lock queue
-                    if (lock != null) {
-                        synchronized (lock) { // lock pool to install
-                            int i;
-                            if ((ws = workQueues) != null &&
-                                (n = ws.length) > 0 &&
-                                ws[i = qid & (n - 1) & SQMASK] == null) {
-                                ws[i] = q;
-                                push = grow = true;
-                            }
-                        }
+            else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
+                int qid = (r | QUIET) & ~(FIFO | OWNED);
+                Object lock = workerNamePrefix;
+                ForkJoinTask<?>[] qa =
+                    new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
+                q = new WorkQueue(this, null);
+                q.array = qa;
+                q.id = qid;
+                q.source = QUIET;
+                if (lock != null) {     // unless disabled, lock pool to install
+                    synchronized (lock) {
+                        WorkQueue[] vs; int i, vn;
+                        if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
+                            vs[i = qid & (vn - 1) & SQMASK] == null)
+                            vs[i] = q;  // else another thread already installed
                     }
                 }
-                else if (q.tryLockSharedQueue()) {
-                    int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
-                    if ((a = q.array) != null && (al = a.length) > 0 &&
-                        al - 1 + (d = b - s) > 0) {
-                        a[(al - 1) & s] = task;
-                        q.top = s + 1;        // relaxed writes OK here
-                        q.phase = 0;
-                        if (d < 0 && q.base - s < -1)
-                            break;            // no signal needed
-                    }
-                    else
-                        grow = true;
-                    push = true;
-                }
-                if (push) {
-                    if (grow) {
-                        try {
-                            q.growArray();
-                            int s = q.top, al; ForkJoinTask<?>[] a;
-                            if ((a = q.array) != null && (al = a.length) > 0) {
-                                a[(al - 1) & s] = task;
-                                q.top = s + 1;
-                            }
-                        } finally {
-                            q.phase = 0;
-                        }
-                    }
+            }
+            else if (!q.tryLockPhase()) // move if busy
+                r = ThreadLocalRandom.advanceProbe(r);
+            else {
+                if (q.lockedPush(task))
                     signalWork();
-                    break;
-                }
-                else                          // move if busy
-                    r = ThreadLocalRandom.advanceProbe(r);
+                return;
             }
         }
     }
@@ -1980,7 +1943,7 @@
         return ((ws = workQueues) != null &&
                 (n = ws.length) > 0 &&
                 (w = ws[(n - 1) & r & SQMASK]) != null &&
-                w.trySharedUnpush(task));
+                w.tryLockedUnpush(task));
     }
 
     /**
@@ -1991,7 +1954,7 @@
         WorkQueue[] ws; WorkQueue w; int n;
         return ((ws = workQueues) != null && (n = ws.length) > 0 &&
                 (w = ws[(n - 1) & r & SQMASK]) != null) ?
-            w.sharedHelpCC(task, maxTasks) : 0;
+            w.helpCC(task, maxTasks, true) : 0;
     }
 
     /**
@@ -2006,7 +1969,7 @@
      */
     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
                            int maxTasks) {
-        return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
+        return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
     }
 
     /**
@@ -2097,15 +2060,18 @@
                     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
                         running = true;
                     else if (ws != null) {
-                        WorkQueue w; int b;
+                        WorkQueue w;
                         for (int i = 0; i < ws.length; ++i) {
                             if ((w = ws[i]) != null) {
-                                checkSum += (b = w.base) + w.id;
+                                int s = w.source, p = w.phase;
+                                int d = w.id, b = w.base;
                                 if (b != w.top ||
-                                    ((i & 1) == 1 && w.source >= 0)) {
+                                    ((d & 1) == 1 && (s >= 0 || p >= 0))) {
                                     running = true;
-                                    break;
+                                    break;     // working, scanning, or have work
                                 }
+                                checkSum += (((long)s << 48) + ((long)p << 32) +
+                                             ((long)b << 16) + (long)d);
                             }
                         }
                     }
@@ -2136,7 +2102,7 @@
                                 } catch (Throwable ignore) {
                                 }
                             }
-                            checkSum += w.base + w.id;
+                            checkSum += ((long)w.phase << 32) + w.base;
                         }
                     }
                 }
@@ -2629,8 +2595,9 @@
      * @return the number of worker threads
      */
     public int getRunningThreadCount() {
+        WorkQueue[] ws; WorkQueue w;
+        VarHandle.acquireFence();
         int rc = 0;
-        WorkQueue[] ws; WorkQueue w;
         if ((ws = workQueues) != null) {
             for (int i = 1; i < ws.length; i += 2) {
                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
@@ -2678,7 +2645,7 @@
                 if ((ws = workQueues) != null) {
                     for (int i = 1; i < ws.length; i += 2) {
                         if ((v = ws[i]) != null) {
-                            if ((v.source & QUIET) == 0)
+                            if (v.source > 0)
                                 return false;
                             --tc;
                         }
@@ -2724,8 +2691,9 @@
      * @return the number of queued tasks
      */
     public long getQueuedTaskCount() {
-        long count = 0;
         WorkQueue[] ws; WorkQueue w;
+        VarHandle.acquireFence();
+        int count = 0;
         if ((ws = workQueues) != null) {
             for (int i = 1; i < ws.length; i += 2) {
                 if ((w = ws[i]) != null)
@@ -2743,8 +2711,9 @@
      * @return the number of queued submissions
      */
     public int getQueuedSubmissionCount() {
+        WorkQueue[] ws; WorkQueue w;
+        VarHandle.acquireFence();
         int count = 0;
-        WorkQueue[] ws; WorkQueue w;
         if ((ws = workQueues) != null) {
             for (int i = 0; i < ws.length; i += 2) {
                 if ((w = ws[i]) != null)
@@ -2762,6 +2731,7 @@
      */
     public boolean hasQueuedSubmissions() {
         WorkQueue[] ws; WorkQueue w;
+        VarHandle.acquireFence();
         if ((ws = workQueues) != null) {
             for (int i = 0; i < ws.length; i += 2) {
                 if ((w = ws[i]) != null && !w.isEmpty())
@@ -2800,8 +2770,9 @@
      * @return the number of elements transferred
      */
     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
+        WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
+        VarHandle.acquireFence();
         int count = 0;
-        WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
         if ((ws = workQueues) != null) {
             for (int i = 0; i < ws.length; ++i) {
                 if ((w = ws[i]) != null) {
@@ -2824,8 +2795,10 @@
      */
     public String toString() {
         // Use a single pass through workQueues to collect counts
+        int md = mode; // read volatile fields first
+        long c = ctl;
+        long st = stealCount;
         long qt = 0L, qs = 0L; int rc = 0;
-        long st = stealCount;
         WorkQueue[] ws; WorkQueue w;
         if ((ws = workQueues) != null) {
             for (int i = 0; i < ws.length; ++i) {
@@ -2843,9 +2816,7 @@
             }
         }
 
-        int md = mode;
         int pc = (md & SMASK);
-        long c = ctl;
         int tc = pc + (short)(c >>> TC_SHIFT);
         int ac = pc + (int)(c >> RC_SHIFT);
         if (ac < 0) // ignore transient negative
@@ -3131,6 +3102,7 @@
      */
     public static void managedBlock(ManagedBlocker blocker)
         throws InterruptedException {
+        if (blocker == null) throw new NullPointerException();
         ForkJoinPool p;
         ForkJoinWorkerThread wt;
         WorkQueue w;
@@ -3163,7 +3135,7 @@
      * available or blocker is released.
      */
     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
-        if (blocker != null && (e instanceof ForkJoinPool)) {
+        if (e instanceof ForkJoinPool) {
             WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
             ForkJoinPool p = (ForkJoinPool)e;
             Thread thread = Thread.currentThread();
@@ -3175,34 +3147,8 @@
                 w = ws[(n - 1) & r & SQMASK];
             else
                 w = null;
-            if (w != null) {
-                for (;;) {
-                    int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
-                    if ((a = w.array) != null && (d = b - s) < 0 &&
-                        (al = a.length) > 0) {
-                        int index = (al - 1) & b;
-                        ForkJoinTask<?> t = (ForkJoinTask<?>)
-                            QA.getAcquire(a, index);
-                        if (blocker.isReleasable())
-                            break;
-                        else if (b++ == w.base) {
-                            if (t == null) {
-                                if (d == -1)
-                                    break;
-                            }
-                            else if (!(t instanceof CompletableFuture.
-                                  AsynchronousCompletionTask))
-                                break;
-                            else if (QA.compareAndSet(a, index, t, null)) {
-                                w.base = b;
-                                t.doExec();
-                            }
-                        }
-                    }
-                    else
-                        break;
-                }
-            }
+            if (w != null)
+                w.helpAsyncBlocker(blocker);
         }
     }
 
@@ -3221,7 +3167,7 @@
     // VarHandle mechanics
     private static final VarHandle CTL;
     private static final VarHandle MODE;
-    private static final VarHandle QA;
+    static final VarHandle QA;
 
     static {
         try {
@@ -3230,7 +3176,7 @@
             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
             QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
         } catch (ReflectiveOperationException e) {
-            throw new Error(e);
+            throw new ExceptionInInitializerError(e);
         }
 
         // Reduce the risk of rare disastrous classloading in first call to