jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
changeset 26448 5853628b0e63
parent 25859 3317bb8137f4
child 30036 3d4eb4503c18
equal deleted inserted replaced
26367:5da963ed0720 26448:5853628b0e63
    47 import java.util.concurrent.Future;
    47 import java.util.concurrent.Future;
    48 import java.util.concurrent.RejectedExecutionException;
    48 import java.util.concurrent.RejectedExecutionException;
    49 import java.util.concurrent.RunnableFuture;
    49 import java.util.concurrent.RunnableFuture;
    50 import java.util.concurrent.ThreadLocalRandom;
    50 import java.util.concurrent.ThreadLocalRandom;
    51 import java.util.concurrent.TimeUnit;
    51 import java.util.concurrent.TimeUnit;
       
    52 import java.util.concurrent.atomic.AtomicLong;
    52 import java.security.AccessControlContext;
    53 import java.security.AccessControlContext;
    53 import java.security.ProtectionDomain;
    54 import java.security.ProtectionDomain;
    54 import java.security.Permissions;
    55 import java.security.Permissions;
    55 
    56 
    56 /**
    57 /**
    78  * reclaimed during periods of non-use, and reinstated upon subsequent
    79  * reclaimed during periods of non-use, and reinstated upon subsequent
    79  * use).
    80  * use).
    80  *
    81  *
    81  * <p>For applications that require separate or custom pools, a {@code
    82  * <p>For applications that require separate or custom pools, a {@code
    82  * ForkJoinPool} may be constructed with a given target parallelism
    83  * ForkJoinPool} may be constructed with a given target parallelism
    83  * level; by default, equal to the number of available processors. The
    84  * level; by default, equal to the number of available processors.
    84  * pool attempts to maintain enough active (or available) threads by
    85  * The pool attempts to maintain enough active (or available) threads
    85  * dynamically adding, suspending, or resuming internal worker
    86  * by dynamically adding, suspending, or resuming internal worker
    86  * threads, even if some tasks are stalled waiting to join others.
    87  * threads, even if some tasks are stalled waiting to join others.
    87  * However, no such adjustments are guaranteed in the face of blocked
    88  * However, no such adjustments are guaranteed in the face of blocked
    88  * I/O or other unmanaged synchronization. The nested {@link
    89  * I/O or other unmanaged synchronization. The nested {@link
    89  * ManagedBlocker} interface enables extension of the kinds of
    90  * ManagedBlocker} interface enables extension of the kinds of
    90  * synchronization accommodated.
    91  * synchronization accommodated.
   140  * - the parallelism level, a non-negative integer
   141  * - the parallelism level, a non-negative integer
   141  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
   142  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
   142  * - the class name of a {@link ForkJoinWorkerThreadFactory}
   143  * - the class name of a {@link ForkJoinWorkerThreadFactory}
   143  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
   144  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
   144  * - the class name of a {@link UncaughtExceptionHandler}
   145  * - the class name of a {@link UncaughtExceptionHandler}
       
   146  * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
       
   147  * - the maximum number of allowed extra threads to maintain target
       
   148  * parallelism (default 256).
   145  * </ul>
   149  * </ul>
   146  * If a {@link SecurityManager} is present and no factory is
   150  * If a {@link SecurityManager} is present and no factory is
   147  * specified, then the default pool uses a factory supplying
   151  * specified, then the default pool uses a factory supplying
   148  * threads that have no {@link Permissions} enabled.
   152  * threads that have no {@link Permissions} enabled.
   149  * The system class loader is used to load these classes.
   153  * The system class loader is used to load these classes.
   176      * Submissions from non-FJ threads enter into submission queues.
   180      * Submissions from non-FJ threads enter into submission queues.
   177      * Workers take these tasks and typically split them into subtasks
   181      * Workers take these tasks and typically split them into subtasks
   178      * that may be stolen by other workers.  Preference rules give
   182      * that may be stolen by other workers.  Preference rules give
   179      * first priority to processing tasks from their own queues (LIFO
   183      * first priority to processing tasks from their own queues (LIFO
   180      * or FIFO, depending on mode), then to randomized FIFO steals of
   184      * or FIFO, depending on mode), then to randomized FIFO steals of
   181      * tasks in other queues.
   185      * tasks in other queues.  This framework began as vehicle for
       
   186      * supporting tree-structured parallelism using work-stealing.
       
   187      * Over time, its scalability advantages led to extensions and
       
   188      * changes to better support more diverse usage contexts.  Because
       
   189      * most internal methods and nested classes are interrelated,
       
   190      * their main rationale and descriptions are presented here;
       
   191      * individual methods and nested classes contain only brief
       
   192      * comments about details.
   182      *
   193      *
   183      * WorkQueues
   194      * WorkQueues
   184      * ==========
   195      * ==========
   185      *
   196      *
   186      * Most operations occur within work-stealing queues (in nested
   197      * Most operations occur within work-stealing queues (in nested
   196      * design is roughly similar to those in the papers "Dynamic
   207      * design is roughly similar to those in the papers "Dynamic
   197      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
   208      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
   198      * (http://research.sun.com/scalable/pubs/index.html) and
   209      * (http://research.sun.com/scalable/pubs/index.html) and
   199      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
   210      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
   200      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
   211      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
   201      * See also "Correct and Efficient Work-Stealing for Weak Memory
   212      * The main differences ultimately stem from GC requirements that
   202      * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
   213      * we null out taken slots as soon as we can, to maintain as small
   203      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
   214      * a footprint as possible even in programs generating huge
   204      * analysis of memory ordering (atomic, volatile etc) issues.  The
   215      * numbers of tasks. To accomplish this, we shift the CAS
   205      * main differences ultimately stem from GC requirements that we
   216      * arbitrating pop vs poll (steal) from being on the indices
   206      * null out taken slots as soon as we can, to maintain as small a
   217      * ("base" and "top") to the slots themselves.
   207      * footprint as possible even in programs generating huge numbers
   218      *
   208      * of tasks. To accomplish this, we shift the CAS arbitrating pop
   219      * Adding tasks then takes the form of a classic array push(task):
   209      * vs poll (steal) from being on the indices ("base" and "top") to
   220      *    q.array[q.top] = task; ++q.top;
   210      * the slots themselves.  So, both a successful pop and poll
   221      *
   211      * mainly entail a CAS of a slot from non-null to null.  Because
   222      * (The actual code needs to null-check and size-check the array,
   212      * we rely on CASes of references, we do not need tag bits on base
   223      * properly fence the accesses, and possibly signal waiting
   213      * or top.  They are simple ints as used in any circular
   224      * workers to start scanning -- see below.)  Both a successful pop
       
   225      * and poll mainly entail a CAS of a slot from non-null to null.
       
   226      *
       
   227      * The pop operation (always performed by owner) is:
       
   228      *   if ((base != top) and
       
   229      *        (the task at top slot is not null) and
       
   230      *        (CAS slot to null))
       
   231      *           decrement top and return task;
       
   232      *
       
   233      * And the poll operation (usually by a stealer) is
       
   234      *    if ((base != top) and
       
   235      *        (the task at base slot is not null) and
       
   236      *        (base has not changed) and
       
   237      *        (CAS slot to null))
       
   238      *           increment base and return task;
       
   239      *
       
   240      * Because we rely on CASes of references, we do not need tag bits
       
   241      * on base or top.  They are simple ints as used in any circular
   214      * array-based queue (see for example ArrayDeque).  Updates to the
   242      * array-based queue (see for example ArrayDeque).  Updates to the
   215      * indices must still be ordered in a way that guarantees that top
   243      * indices guarantee that top == base means the queue is empty,
   216      * == base means the queue is empty, but otherwise may err on the
   244      * but otherwise may err on the side of possibly making the queue
   217      * side of possibly making the queue appear nonempty when a push,
   245      * appear nonempty when a push, pop, or poll have not fully
   218      * pop, or poll have not fully committed. Note that this means
   246      * committed. (Method isEmpty() checks the case of a partially
   219      * that the poll operation, considered individually, is not
   247      * completed removal of the last element.)  Because of this, the
   220      * wait-free. One thief cannot successfully continue until another
   248      * poll operation, considered individually, is not wait-free. One
   221      * in-progress one (or, if previously empty, a push) completes.
   249      * thief cannot successfully continue until another in-progress
   222      * However, in the aggregate, we ensure at least probabilistic
   250      * one (or, if previously empty, a push) completes.  However, in
       
   251      * the aggregate, we ensure at least probabilistic
   223      * non-blockingness.  If an attempted steal fails, a thief always
   252      * non-blockingness.  If an attempted steal fails, a thief always
   224      * chooses a different random victim target to try next. So, in
   253      * chooses a different random victim target to try next. So, in
   225      * order for one thief to progress, it suffices for any
   254      * order for one thief to progress, it suffices for any
   226      * in-progress poll or new push on any empty queue to
   255      * in-progress poll or new push on any empty queue to
   227      * complete. (This is why we normally use method pollAt and its
   256      * complete. (This is why we normally use method pollAt and its
   228      * variants that try once at the apparent base index, else
   257      * variants that try once at the apparent base index, else
   229      * consider alternative actions, rather than method poll.)
   258      * consider alternative actions, rather than method poll, which
   230      *
   259      * retries.)
   231      * This approach also enables support of a user mode in which local
   260      *
   232      * task processing is in FIFO, not LIFO order, simply by using
   261      * This approach also enables support of a user mode in which
   233      * poll rather than pop.  This can be useful in message-passing
   262      * local task processing is in FIFO, not LIFO order, simply by
   234      * frameworks in which tasks are never joined.  However neither
   263      * using poll rather than pop.  This can be useful in
   235      * mode considers affinities, loads, cache localities, etc, so
   264      * message-passing frameworks in which tasks are never joined.
   236      * rarely provide the best possible performance on a given
   265      * However neither mode considers affinities, loads, cache
   237      * machine, but portably provide good throughput by averaging over
   266      * localities, etc, so rarely provide the best possible
   238      * these factors.  (Further, even if we did try to use such
   267      * performance on a given machine, but portably provide good
   239      * information, we do not usually have a basis for exploiting it.
   268      * throughput by averaging over these factors.  Further, even if
   240      * For example, some sets of tasks profit from cache affinities,
   269      * we did try to use such information, we do not usually have a
   241      * but others are harmed by cache pollution effects.)
   270      * basis for exploiting it.  For example, some sets of tasks
       
   271      * profit from cache affinities, but others are harmed by cache
       
   272      * pollution effects. Additionally, even though it requires
       
   273      * scanning, long-term throughput is often best using random
       
   274      * selection rather than directed selection policies, so cheap
       
   275      * randomization of sufficient quality is used whenever
       
   276      * applicable.  Various Marsaglia XorShifts (some with different
       
   277      * shift constants) are inlined at use points.
   242      *
   278      *
   243      * WorkQueues are also used in a similar way for tasks submitted
   279      * WorkQueues are also used in a similar way for tasks submitted
   244      * to the pool. We cannot mix these tasks in the same queues used
   280      * to the pool. We cannot mix these tasks in the same queues used
   245      * for work-stealing (this would contaminate lifo/fifo
   281      * by workers. Instead, we randomly associate submission queues
   246      * processing). Instead, we randomly associate submission queues
       
   247      * with submitting threads, using a form of hashing.  The
   282      * with submitting threads, using a form of hashing.  The
   248      * ThreadLocalRandom probe value serves as a hash code for
   283      * ThreadLocalRandom probe value serves as a hash code for
   249      * choosing existing queues, and may be randomly repositioned upon
   284      * choosing existing queues, and may be randomly repositioned upon
   250      * contention with other submitters.  In essence, submitters act
   285      * contention with other submitters.  In essence, submitters act
   251      * like workers except that they are restricted to executing local
   286      * like workers except that they are restricted to executing local
   252      * tasks that they submitted (or in the case of CountedCompleters,
   287      * tasks that they submitted (or in the case of CountedCompleters,
   253      * others with the same root task).  However, because most
   288      * others with the same root task).  Insertion of tasks in shared
   254      * shared/external queue operations are more expensive than
       
   255      * internal, and because, at steady state, external submitters
       
   256      * will compete for CPU with workers, ForkJoinTask.join and
       
   257      * related methods disable them from repeatedly helping to process
       
   258      * tasks if all workers are active.  Insertion of tasks in shared
       
   259      * mode requires a lock (mainly to protect in the case of
   289      * mode requires a lock (mainly to protect in the case of
   260      * resizing) but we use only a simple spinlock (using bits in
   290      * resizing) but we use only a simple spinlock (using field
   261      * field qlock), because submitters encountering a busy queue move
   291      * qlock), because submitters encountering a busy queue move on to
   262      * on to try or create other queues -- they block only when
   292      * try or create other queues -- they block only when creating and
   263      * creating and registering new queues.
   293      * registering new queues. Additionally, "qlock" saturates to an
       
   294      * unlockable value (-1) at shutdown. Unlocking still can be and
       
   295      * is performed by cheaper ordered writes of "qlock" in successful
       
   296      * cases, but uses CAS in unsuccessful cases.
   264      *
   297      *
   265      * Management
   298      * Management
   266      * ==========
   299      * ==========
   267      *
   300      *
   268      * The main throughput advantages of work-stealing stem from
   301      * The main throughput advantages of work-stealing stem from
   269      * decentralized control -- workers mostly take tasks from
   302      * decentralized control -- workers mostly take tasks from
   270      * themselves or each other. We cannot negate this in the
   303      * themselves or each other, at rates that can exceed a billion
   271      * implementation of other management responsibilities. The main
   304      * per second.  The pool itself creates, activates (enables
   272      * tactic for avoiding bottlenecks is packing nearly all
   305      * scanning for and running tasks), deactivates, blocks, and
   273      * essentially atomic control state into two volatile variables
   306      * terminates threads, all with minimal central information.
   274      * that are by far most often read (not written) as status and
   307      * There are only a few properties that we can globally track or
   275      * consistency checks.
   308      * maintain, so we pack them into a small number of variables,
   276      *
   309      * often maintaining atomicity without blocking or locking.
   277      * Field "ctl" contains 64 bits holding all the information needed
   310      * Nearly all essentially atomic control state is held in two
   278      * to atomically decide to add, inactivate, enqueue (on an event
   311      * volatile variables that are by far most often read (not
       
   312      * written) as status and consistency checks. (Also, field
       
   313      * "config" holds unchanging configuration state.)
       
   314      *
       
   315      * Field "ctl" contains 64 bits holding information needed to
       
   316      * atomically decide to add, inactivate, enqueue (on an event
   279      * queue), dequeue, and/or re-activate workers.  To enable this
   317      * queue), dequeue, and/or re-activate workers.  To enable this
   280      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
   318      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
   281      * far in excess of normal operating range) to allow ids, counts,
   319      * far in excess of normal operating range) to allow ids, counts,
   282      * and their negations (used for thresholding) to fit into 16bit
   320      * and their negations (used for thresholding) to fit into 16bit
   283      * fields.
   321      * subfields.
   284      *
   322      *
   285      * Field "plock" is a form of sequence lock with a saturating
   323      * Field "runState" holds lockable state bits (STARTED, STOP, etc)
   286      * shutdown bit (similarly for per-queue "qlocks"), mainly
   324      * also protecting updates to the workQueues array.  When used as
   287      * protecting updates to the workQueues array, as well as to
   325      * a lock, it is normally held only for a few instructions (the
   288      * enable shutdown.  When used as a lock, it is normally only very
   326      * only exceptions are one-time array initialization and uncommon
   289      * briefly held, so is nearly always available after at most a
   327      * resizing), so is nearly always available after at most a brief
   290      * brief spin, but we use a monitor-based backup strategy to
   328      * spin. But to be extra-cautious, after spinning, method
   291      * block when needed.
   329      * awaitRunStateLock (called only if an initial CAS fails), uses a
       
   330      * wait/notify mechanics on a builtin monitor to block when
       
   331      * (rarely) needed. This would be a terrible idea for a highly
       
   332      * contended lock, but most pools run without the lock ever
       
   333      * contending after the spin limit, so this works fine as a more
       
   334      * conservative alternative. Because we don't otherwise have an
       
   335      * internal Object to use as a monitor, the "stealCounter" (an
       
   336      * AtomicLong) is used when available (it too must be lazily
       
   337      * initialized; see externalSubmit).
       
   338      *
       
   339      * Usages of "runState" vs "ctl" interact in only one case:
       
   340      * deciding to add a worker thread (see tryAddWorker), in which
       
   341      * case the ctl CAS is performed while the lock is held.
   292      *
   342      *
   293      * Recording WorkQueues.  WorkQueues are recorded in the
   343      * Recording WorkQueues.  WorkQueues are recorded in the
   294      * "workQueues" array that is created upon first use and expanded
   344      * "workQueues" array. The array is created upon first use (see
   295      * if necessary.  Updates to the array while recording new workers
   345      * externalSubmit) and expanded if necessary.  Updates to the
   296      * and unrecording terminated ones are protected from each other
   346      * array while recording new workers and unrecording terminated
   297      * by a lock but the array is otherwise concurrently readable, and
   347      * ones are protected from each other by the runState lock, but
   298      * accessed directly.  To simplify index-based operations, the
   348      * the array is otherwise concurrently readable, and accessed
   299      * array size is always a power of two, and all readers must
   349      * directly. We also ensure that reads of the array reference
   300      * tolerate null slots. Worker queues are at odd indices. Shared
   350      * itself never become too stale. To simplify index-based
   301      * (submission) queues are at even indices, up to a maximum of 64
   351      * operations, the array size is always a power of two, and all
   302      * slots, to limit growth even if array needs to expand to add
   352      * readers must tolerate null slots. Worker queues are at odd
   303      * more workers. Grouping them together in this way simplifies and
   353      * indices. Shared (submission) queues are at even indices, up to
   304      * speeds up task scanning.
   354      * a maximum of 64 slots, to limit growth even if array needs to
       
   355      * expand to add more workers. Grouping them together in this way
       
   356      * simplifies and speeds up task scanning.
   305      *
   357      *
   306      * All worker thread creation is on-demand, triggered by task
   358      * All worker thread creation is on-demand, triggered by task
   307      * submissions, replacement of terminated workers, and/or
   359      * submissions, replacement of terminated workers, and/or
   308      * compensation for blocked workers. However, all other support
   360      * compensation for blocked workers. However, all other support
   309      * code is set up to work with other policies.  To ensure that we
   361      * code is set up to work with other policies.  To ensure that we
   310      * do not hold on to worker references that would prevent GC, ALL
   362      * do not hold on to worker references that would prevent GC, All
   311      * accesses to workQueues are via indices into the workQueues
   363      * accesses to workQueues are via indices into the workQueues
   312      * array (which is one source of some of the messy code
   364      * array (which is one source of some of the messy code
   313      * constructions here). In essence, the workQueues array serves as
   365      * constructions here). In essence, the workQueues array serves as
   314      * a weak reference mechanism. Thus for example the wait queue
   366      * a weak reference mechanism. Thus for example the stack top
   315      * field of ctl stores indices, not references.  Access to the
   367      * subfield of ctl stores indices, not references.
   316      * workQueues in associated methods (for example signalWork) must
   368      *
   317      * both index-check and null-check the IDs. All such accesses
   369      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
   318      * ignore bad IDs by returning out early from what they are doing,
   370      * cannot let workers spin indefinitely scanning for tasks when
   319      * since this can only be associated with termination, in which
   371      * none can be found immediately, and we cannot start/resume
   320      * case it is OK to give up.  All uses of the workQueues array
   372      * workers unless there appear to be tasks available.  On the
   321      * also check that it is non-null (even if previously
   373      * other hand, we must quickly prod them into action when new
   322      * non-null). This allows nulling during termination, which is
   374      * tasks are submitted or generated. In many usages, ramp-up time
   323      * currently not necessary, but remains an option for
   375      * to activate workers is the main limiting factor in overall
   324      * resource-revocation-based shutdown schemes. It also helps
   376      * performance, which is compounded at program start-up by JIT
   325      * reduce JIT issuance of uncommon-trap code, which tends to
   377      * compilation and allocation. So we streamline this as much as
   326      * unnecessarily complicate control flow in some methods.
   378      * possible.
   327      *
   379      *
   328      * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
   380      * The "ctl" field atomically maintains active and total worker
   329      * let workers spin indefinitely scanning for tasks when none can
   381      * counts as well as a queue to place waiting threads so they can
   330      * be found immediately, and we cannot start/resume workers unless
   382      * be located for signalling. Active counts also play the role of
   331      * there appear to be tasks available.  On the other hand, we must
   383      * quiescence indicators, so are decremented when workers believe
   332      * quickly prod them into action when new tasks are submitted or
   384      * that there are no more tasks to execute. The "queue" is
   333      * generated. In many usages, ramp-up time to activate workers is
   385      * actually a form of Treiber stack.  A stack is ideal for
   334      * the main limiting factor in overall performance (this is
   386      * activating threads in most-recently used order. This improves
   335      * compounded at program start-up by JIT compilation and
   387      * performance and locality, outweighing the disadvantages of
   336      * allocation). So we try to streamline this as much as possible.
   388      * being prone to contention and inability to release a worker
   337      * We park/unpark workers after placing in an event wait queue
   389      * unless it is topmost on stack.  We park/unpark workers after
   338      * when they cannot find work. This "queue" is actually a simple
   390      * pushing on the idle worker stack (represented by the lower
   339      * Treiber stack, headed by the "id" field of ctl, plus a 15bit
   391      * 32bit subfield of ctl) when they cannot find work.  The top
   340      * counter value (that reflects the number of times a worker has
   392      * stack state holds the value of the "scanState" field of the
   341      * been inactivated) to avoid ABA effects (we need only as many
   393      * worker: its index and status, plus a version counter that, in
   342      * version numbers as worker threads). Successors are held in
   394      * addition to the count subfields (also serving as version
   343      * field WorkQueue.nextWait.  Queuing deals with several intrinsic
   395      * stamps) provide protection against Treiber stack ABA effects.
   344      * races, mainly that a task-producing thread can miss seeing (and
   396      *
   345      * signalling) another thread that gave up looking for work but
   397      * Field scanState is used by both workers and the pool to manage
   346      * has not yet entered the wait queue. We solve this by requiring
   398      * and track whether a worker is INACTIVE (possibly blocked
   347      * a full sweep of all workers (via repeated calls to method
   399      * waiting for a signal), or SCANNING for tasks (when neither hold
   348      * scan()) both before and after a newly waiting worker is added
   400      * it is busy running tasks).  When a worker is inactivated, its
   349      * to the wait queue.  Because enqueued workers may actually be
   401      * scanState field is set, and is prevented from executing tasks,
   350      * rescanning rather than waiting, we set and clear the "parker"
   402      * even though it must scan once for them to avoid queuing
       
   403      * races. Note that scanState updates lag queue CAS releases so
       
   404      * usage requires care. When queued, the lower 16 bits of
       
   405      * scanState must hold its pool index. So we place the index there
       
   406      * upon initialization (see registerWorker) and otherwise keep it
       
   407      * there or restore it when necessary.
       
   408      *
       
   409      * Memory ordering.  See "Correct and Efficient Work-Stealing for
       
   410      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
       
   411      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
       
   412      * analysis of memory ordering requirements in work-stealing
       
   413      * algorithms similar to the one used here.  We usually need
       
   414      * stronger than minimal ordering because we must sometimes signal
       
   415      * workers, requiring Dekker-like full-fences to avoid lost
       
   416      * signals.  Arranging for enough ordering without expensive
       
   417      * over-fencing requires tradeoffs among the supported means of
       
   418      * expressing access constraints. The most central operations,
       
   419      * taking from queues and updating ctl state, require full-fence
       
   420      * CAS.  Array slots are read using the emulation of volatiles
       
   421      * provided by Unsafe.  Access from other threads to WorkQueue
       
   422      * base, top, and array requires a volatile load of the first of
       
   423      * any of these read.  We use the convention of declaring the
       
   424      * "base" index volatile, and always read it before other fields.
       
   425      * The owner thread must ensure ordered updates, so writes use
       
   426      * ordered intrinsics unless they can piggyback on those for other
       
   427      * writes.  Similar conventions and rationales hold for other
       
   428      * WorkQueue fields (such as "currentSteal") that are only written
       
   429      * by owners but observed by others.
       
   430      *
       
   431      * Creating workers. To create a worker, we pre-increment total
       
   432      * count (serving as a reservation), and attempt to construct a
       
   433      * ForkJoinWorkerThread via its factory. Upon construction, the
       
   434      * new thread invokes registerWorker, where it constructs a
       
   435      * WorkQueue and is assigned an index in the workQueues array
       
   436      * (expanding the array if necessary). The thread is then
       
   437      * started. Upon any exception across these steps, or null return
       
   438      * from factory, deregisterWorker adjusts counts and records
       
   439      * accordingly.  If a null return, the pool continues running with
       
   440      * fewer than the target number workers. If exceptional, the
       
   441      * exception is propagated, generally to some external caller.
       
   442      * Worker index assignment avoids the bias in scanning that would
       
   443      * occur if entries were sequentially packed starting at the front
       
   444      * of the workQueues array. We treat the array as a simple
       
   445      * power-of-two hash table, expanding as needed. The seedIndex
       
   446      * increment ensures no collisions until a resize is needed or a
       
   447      * worker is deregistered and replaced, and thereafter keeps
       
   448      * probability of collision low. We cannot use
       
   449      * ThreadLocalRandom.getProbe() for similar purposes here because
       
   450      * the thread has not started yet, but do so for creating
       
   451      * submission queues for existing external threads.
       
   452      *
       
   453      * Deactivation and waiting. Queuing encounters several intrinsic
       
   454      * races; most notably that a task-producing thread can miss
       
   455      * seeing (and signalling) another thread that gave up looking for
       
   456      * work but has not yet entered the wait queue.  When a worker
       
   457      * cannot find a task to steal, it deactivates and enqueues. Very
       
   458      * often, the lack of tasks is transient due to GC or OS
       
   459      * scheduling. To reduce false-alarm deactivation, scanners
       
   460      * compute checksums of queue states during sweeps.  (The
       
   461      * stability checks used here and elsewhere are probabilistic
       
   462      * variants of snapshot techniques -- see Herlihy & Shavit.)
       
   463      * Workers give up and try to deactivate only after the sum is
       
   464      * stable across scans. Further, to avoid missed signals, they
       
   465      * repeat this scanning process after successful enqueuing until
       
   466      * again stable.  In this state, the worker cannot take/run a task
       
   467      * it sees until it is released from the queue, so the worker
       
   468      * itself eventually tries to release itself or any successor (see
       
   469      * tryRelease).  Otherwise, upon an empty scan, a deactivated
       
   470      * worker uses an adaptive local spin construction (see awaitWork)
       
   471      * before blocking (via park). Note the unusual conventions about
       
   472      * Thread.interrupts surrounding parking and other blocking:
       
   473      * Because interrupts are used solely to alert threads to check
       
   474      * termination, which is checked anyway upon blocking, we clear
       
   475      * status (using Thread.interrupted) before any call to park, so
       
   476      * that park does not immediately return due to status being set
       
   477      * via some other unrelated call to interrupt in user code.
       
   478      *
       
   479      * Signalling and activation.  Workers are created or activated
       
   480      * only when there appears to be at least one task they might be
       
   481      * able to find and execute.  Upon push (either by a worker or an
       
   482      * external submission) to a previously (possibly) empty queue,
       
   483      * workers are signalled if idle, or created if fewer exist than
       
   484      * the given parallelism level.  These primary signals are
       
   485      * buttressed by others whenever other threads remove a task from
       
   486      * a queue and notice that there are other tasks there as well.
       
   487      * On most platforms, signalling (unpark) overhead time is
       
   488      * noticeably long, and the time between signalling a thread and
       
   489      * it actually making progress can be very noticeably long, so it
       
   490      * is worth offloading these delays from critical paths as much as
       
   491      * possible. Also, because inactive workers are often rescanning
       
   492      * or spinning rather than blocking, we set and clear the "parker"
   351      * field of WorkQueues to reduce unnecessary calls to unpark.
   493      * field of WorkQueues to reduce unnecessary calls to unpark.
   352      * (This requires a secondary recheck to avoid missed signals.)
   494      * (This requires a secondary recheck to avoid missed signals.)
   353      * Note the unusual conventions about Thread.interrupts
       
   354      * surrounding parking and other blocking: Because interrupts are
       
   355      * used solely to alert threads to check termination, which is
       
   356      * checked anyway upon blocking, we clear status (using
       
   357      * Thread.interrupted) before any call to park, so that park does
       
   358      * not immediately return due to status being set via some other
       
   359      * unrelated call to interrupt in user code.
       
   360      *
       
   361      * Signalling.  We create or wake up workers only when there
       
   362      * appears to be at least one task they might be able to find and
       
   363      * execute.  When a submission is added or another worker adds a
       
   364      * task to a queue that has fewer than two tasks, they signal
       
   365      * waiting workers (or trigger creation of new ones if fewer than
       
   366      * the given parallelism level -- signalWork).  These primary
       
   367      * signals are buttressed by others whenever other threads remove
       
   368      * a task from a queue and notice that there are other tasks there
       
   369      * as well.  So in general, pools will be over-signalled. On most
       
   370      * platforms, signalling (unpark) overhead time is noticeably
       
   371      * long, and the time between signalling a thread and it actually
       
   372      * making progress can be very noticeably long, so it is worth
       
   373      * offloading these delays from critical paths as much as
       
   374      * possible. Additionally, workers spin-down gradually, by staying
       
   375      * alive so long as they see the ctl state changing.  Similar
       
   376      * stability-sensing techniques are also used before blocking in
       
   377      * awaitJoin and helpComplete.
       
   378      *
   495      *
   379      * Trimming workers. To release resources after periods of lack of
   496      * Trimming workers. To release resources after periods of lack of
   380      * use, a worker starting to wait when the pool is quiescent will
   497      * use, a worker starting to wait when the pool is quiescent will
   381      * time out and terminate if the pool has remained quiescent for a
   498      * time out and terminate (see awaitWork) if the pool has remained
   382      * given period -- a short period if there are more threads than
   499      * quiescent for period IDLE_TIMEOUT, increasing the period as the
   383      * parallelism, longer as the number of threads decreases. This
   500      * number of threads decreases, eventually removing all workers.
   384      * will slowly propagate, eventually terminating all workers after
   501      * Also, when more than two spare threads exist, excess threads
   385      * periods of non-use.
   502      * are immediately terminated at the next quiescent point.
   386      *
   503      * (Padding by two avoids hysteresis.)
   387      * Shutdown and Termination. A call to shutdownNow atomically sets
   504      *
   388      * a plock bit and then (non-atomically) sets each worker's
   505      * Shutdown and Termination. A call to shutdownNow invokes
   389      * qlock status, cancels all unprocessed tasks, and wakes up
   506      * tryTerminate to atomically set a runState bit. The calling
   390      * all waiting workers.  Detecting whether termination should
   507      * thread, as well as every other worker thereafter terminating,
   391      * commence after a non-abrupt shutdown() call requires more work
   508      * helps terminate others by setting their (qlock) status,
   392      * and bookkeeping. We need consensus about quiescence (i.e., that
   509      * cancelling their unprocessed tasks, and waking them up, doing
   393      * there is no more work). The active count provides a primary
   510      * so repeatedly until stable (but with a loop bounded by the
   394      * indication but non-abrupt shutdown still requires a rechecking
   511      * number of workers).  Calls to non-abrupt shutdown() preface
   395      * scan for any workers that are inactive but not queued.
   512      * this by checking whether termination should commence. This
       
   513      * relies primarily on the active count bits of "ctl" maintaining
       
   514      * consensus -- tryTerminate is called from awaitWork whenever
       
   515      * quiescent. However, external submitters do not take part in
       
   516      * this consensus.  So, tryTerminate sweeps through queues (until
       
   517      * stable) to ensure lack of in-flight submissions and workers
       
   518      * about to process them before triggering the "STOP" phase of
       
   519      * termination. (Note: there is an intrinsic conflict if
       
   520      * helpQuiescePool is called when shutdown is enabled. Both wait
       
   521      * for quiescence, but tryTerminate is biased to not trigger until
       
   522      * helpQuiescePool completes.)
       
   523      *
   396      *
   524      *
   397      * Joining Tasks
   525      * Joining Tasks
   398      * =============
   526      * =============
   399      *
   527      *
   400      * Any of several actions may be taken when one worker is waiting
   528      * Any of several actions may be taken when one worker is waiting
   401      * to join a task stolen (or always held) by another.  Because we
   529      * to join a task stolen (or always held) by another.  Because we
   402      * are multiplexing many tasks on to a pool of workers, we can't
   530      * are multiplexing many tasks on to a pool of workers, we can't
   403      * just let them block (as in Thread.join).  We also cannot just
   531      * just let them block (as in Thread.join).  We also cannot just
   404      * reassign the joiner's run-time stack with another and replace
   532      * reassign the joiner's run-time stack with another and replace
   405      * it later, which would be a form of "continuation", that even if
   533      * it later, which would be a form of "continuation", that even if
   406      * possible is not necessarily a good idea since we sometimes need
   534      * possible is not necessarily a good idea since we may need both
   407      * both an unblocked task and its continuation to progress.
   535      * an unblocked task and its continuation to progress.  Instead we
   408      * Instead we combine two tactics:
   536      * combine two tactics:
   409      *
   537      *
   410      *   Helping: Arranging for the joiner to execute some task that it
   538      *   Helping: Arranging for the joiner to execute some task that it
   411      *      would be running if the steal had not occurred.
   539      *      would be running if the steal had not occurred.
   412      *
   540      *
   413      *   Compensating: Unless there are already enough live threads,
   541      *   Compensating: Unless there are already enough live threads,
   423      * typically worthwhile).
   551      * typically worthwhile).
   424      *
   552      *
   425      * The ManagedBlocker extension API can't use helping so relies
   553      * The ManagedBlocker extension API can't use helping so relies
   426      * only on compensation in method awaitBlocker.
   554      * only on compensation in method awaitBlocker.
   427      *
   555      *
   428      * The algorithm in tryHelpStealer entails a form of "linear"
   556      * The algorithm in helpStealer entails a form of "linear
   429      * helping: Each worker records (in field currentSteal) the most
   557      * helping".  Each worker records (in field currentSteal) the most
   430      * recent task it stole from some other worker. Plus, it records
   558      * recent task it stole from some other worker (or a submission).
   431      * (in field currentJoin) the task it is currently actively
   559      * It also records (in field currentJoin) the task it is currently
   432      * joining. Method tryHelpStealer uses these markers to try to
   560      * actively joining. Method helpStealer uses these markers to try
   433      * find a worker to help (i.e., steal back a task from and execute
   561      * to find a worker to help (i.e., steal back a task from and
   434      * it) that could hasten completion of the actively joined task.
   562      * execute it) that could hasten completion of the actively joined
   435      * In essence, the joiner executes a task that would be on its own
   563      * task.  Thus, the joiner executes a task that would be on its
   436      * local deque had the to-be-joined task not been stolen. This may
   564      * own local deque had the to-be-joined task not been stolen. This
   437      * be seen as a conservative variant of the approach in Wagner &
   565      * is a conservative variant of the approach described in Wagner &
   438      * Calder "Leapfrogging: a portable technique for implementing
   566      * Calder "Leapfrogging: a portable technique for implementing
   439      * efficient futures" SIGPLAN Notices, 1993
   567      * efficient futures" SIGPLAN Notices, 1993
   440      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
   568      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
   441      * that: (1) We only maintain dependency links across workers upon
   569      * that: (1) We only maintain dependency links across workers upon
   442      * steals, rather than use per-task bookkeeping.  This sometimes
   570      * steals, rather than use per-task bookkeeping.  This sometimes
   450      * potentially cyclic mutual steals.  (3) It is intentionally
   578      * potentially cyclic mutual steals.  (3) It is intentionally
   451      * racy: field currentJoin is updated only while actively joining,
   579      * racy: field currentJoin is updated only while actively joining,
   452      * which means that we miss links in the chain during long-lived
   580      * which means that we miss links in the chain during long-lived
   453      * tasks, GC stalls etc (which is OK since blocking in such cases
   581      * tasks, GC stalls etc (which is OK since blocking in such cases
   454      * is usually a good idea).  (4) We bound the number of attempts
   582      * is usually a good idea).  (4) We bound the number of attempts
   455      * to find work (see MAX_HELP) and fall back to suspending the
   583      * to find work using checksums and fall back to suspending the
   456      * worker and if necessary replacing it with another.
   584      * worker and if necessary replacing it with another.
   457      *
   585      *
   458      * Helping actions for CountedCompleters are much simpler: Method
   586      * Helping actions for CountedCompleters do not require tracking
   459      * helpComplete can take and execute any task with the same root
   587      * currentJoins: Method helpComplete takes and executes any task
   460      * as the task being waited on. However, this still entails some
   588      * with the same root as the task being waited on (preferring
   461      * traversal of completer chains, so is less efficient than using
   589      * local pops to non-local polls). However, this still entails
   462      * CountedCompleters without explicit joins.
   590      * some traversal of completer chains, so is less efficient than
   463      *
   591      * using CountedCompleters without explicit joins.
   464      * It is impossible to keep exactly the target parallelism number
   592      *
   465      * of threads running at any given time.  Determining the
   593      * Compensation does not aim to keep exactly the target
   466      * existence of conservatively safe helping targets, the
   594      * parallelism number of unblocked threads running at any given
   467      * availability of already-created spares, and the apparent need
   595      * time. Some previous versions of this class employed immediate
   468      * to create new spares are all racy, so we rely on multiple
   596      * compensations for any blocked join. However, in practice, the
   469      * retries of each.  Compensation in the apparent absence of
   597      * vast majority of blockages are transient byproducts of GC and
   470      * helping opportunities is challenging to control on JVMs, where
   598      * other JVM or OS activities that are made worse by replacement.
   471      * GC and other activities can stall progress of tasks that in
   599      * Currently, compensation is attempted only after validating that
   472      * turn stall out many other dependent tasks, without us being
   600      * all purportedly active threads are processing tasks by checking
   473      * able to determine whether they will ever require compensation.
   601      * field WorkQueue.scanState, which eliminates most false
   474      * Even though work-stealing otherwise encounters little
   602      * positives.  Also, compensation is bypassed (tolerating fewer
   475      * degradation in the presence of more threads than cores,
   603      * threads) in the most common case in which it is rarely
   476      * aggressively adding new threads in such cases entails risk of
   604      * beneficial: when a worker with an empty queue (thus no
   477      * unwanted positive feedback control loops in which more threads
   605      * continuation tasks) blocks on a join and there still remain
   478      * cause more dependent stalls (as well as delayed progress of
   606      * enough threads to ensure liveness.
   479      * unblocked threads to the point that we know they are available)
   607      *
   480      * leading to more situations requiring more threads, and so
   608      * The compensation mechanism may be bounded.  Bounds for the
   481      * on. This aspect of control can be seen as an (analytically
   609      * commonPool (see commonMaxSpares) better enable JVMs to cope
   482      * intractable) game with an opponent that may choose the worst
   610      * with programming errors and abuse before running out of
   483      * (for us) active thread to stall at any time.  We take several
   611      * resources to do so. In other cases, users may supply factories
   484      * precautions to bound losses (and thus bound gains), mainly in
   612      * that limit thread construction. The effects of bounding in this
   485      * methods tryCompensate and awaitJoin.
   613      * pool (like all others) is imprecise.  Total worker counts are
       
   614      * decremented when threads deregister, not when they exit and
       
   615      * resources are reclaimed by the JVM and OS. So the number of
       
   616      * simultaneously live threads may transiently exceed bounds.
   486      *
   617      *
   487      * Common Pool
   618      * Common Pool
   488      * ===========
   619      * ===========
   489      *
   620      *
   490      * The static common pool always exists after static
   621      * The static common pool always exists after static
   491      * initialization.  Since it (or any other created pool) need
   622      * initialization.  Since it (or any other created pool) need
   492      * never be used, we minimize initial construction overhead and
   623      * never be used, we minimize initial construction overhead and
   493      * footprint to the setup of about a dozen fields, with no nested
   624      * footprint to the setup of about a dozen fields, with no nested
   494      * allocation. Most bootstrapping occurs within method
   625      * allocation. Most bootstrapping occurs within method
   495      * fullExternalPush during the first submission to the pool.
   626      * externalSubmit during the first submission to the pool.
   496      *
   627      *
   497      * When external threads submit to the common pool, they can
   628      * When external threads submit to the common pool, they can
   498      * perform subtask processing (see externalHelpJoin and related
   629      * perform subtask processing (see externalHelpComplete and
   499      * methods).  This caller-helps policy makes it sensible to set
   630      * related methods) upon joins.  This caller-helps policy makes it
   500      * common pool parallelism level to one (or more) less than the
   631      * sensible to set common pool parallelism level to one (or more)
   501      * total number of available cores, or even zero for pure
   632      * less than the total number of available cores, or even zero for
   502      * caller-runs.  We do not need to record whether external
   633      * pure caller-runs.  We do not need to record whether external
   503      * submissions are to the common pool -- if not, externalHelpJoin
   634      * submissions are to the common pool -- if not, external help
   504      * returns quickly (at the most helping to signal some common pool
   635      * methods return quickly. These submitters would otherwise be
   505      * workers). These submitters would otherwise be blocked waiting
   636      * blocked waiting for completion, so the extra effort (with
   506      * for completion, so the extra effort (with liberally sprinkled
   637      * liberally sprinkled task status checks) in inapplicable cases
   507      * task status checks) in inapplicable cases amounts to an odd
   638      * amounts to an odd form of limited spin-wait before blocking in
   508      * form of limited spin-wait before blocking in ForkJoinTask.join.
   639      * ForkJoinTask.join.
   509      *
   640      *
   510      * As a more appropriate default in managed environments, unless
   641      * As a more appropriate default in managed environments, unless
   511      * overridden by system properties, we use workers of subclass
   642      * overridden by system properties, we use workers of subclass
   512      * InnocuousForkJoinWorkerThread when there is a SecurityManager
   643      * InnocuousForkJoinWorkerThread when there is a SecurityManager
   513      * present. These workers have no permissions set, do not belong
   644      * present. These workers have no permissions set, do not belong
   514      * to any user-defined ThreadGroup, and erase all ThreadLocals
   645      * to any user-defined ThreadGroup, and erase all ThreadLocals
   515      * after executing any top-level task (see WorkQueue.runTask). The
   646      * after executing any top-level task (see WorkQueue.runTask).
   516      * associated mechanics (mainly in ForkJoinWorkerThread) may be
   647      * The associated mechanics (mainly in ForkJoinWorkerThread) may
   517      * JVM-dependent and must access particular Thread class fields to
   648      * be JVM-dependent and must access particular Thread class fields
   518      * achieve this effect.
   649      * to achieve this effect.
   519      *
   650      *
   520      * Style notes
   651      * Style notes
   521      * ===========
   652      * ===========
       
   653      *
       
   654      * Memory ordering relies mainly on Unsafe intrinsics that carry
       
   655      * the further responsibility of explicitly performing null- and
       
   656      * bounds- checks otherwise carried out implicitly by JVMs.  This
       
   657      * can be awkward and ugly, but also reflects the need to control
       
   658      * outcomes across the unusual cases that arise in very racy code
       
   659      * with very few invariants. So these explicit checks would exist
       
   660      * in some form anyway.  All fields are read into locals before
       
   661      * use, and null-checked if they are references.  This is usually
       
   662      * done in a "C"-like style of listing declarations at the heads
       
   663      * of methods or blocks, and using inline assignments on first
       
   664      * encounter.  Array bounds-checks are usually performed by
       
   665      * masking with array.length-1, which relies on the invariant that
       
   666      * these arrays are created with positive lengths, which is itself
       
   667      * paranoically checked. Nearly all explicit checks lead to
       
   668      * bypass/return, not exception throws, because they may
       
   669      * legitimately arise due to cancellation/revocation during
       
   670      * shutdown.
   522      *
   671      *
   523      * There is a lot of representation-level coupling among classes
   672      * There is a lot of representation-level coupling among classes
   524      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
   673      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
   525      * fields of WorkQueue maintain data structures managed by
   674      * fields of WorkQueue maintain data structures managed by
   526      * ForkJoinPool, so are directly accessed.  There is little point
   675      * ForkJoinPool, so are directly accessed.  There is little point
   527      * trying to reduce this, since any associated future changes in
   676      * trying to reduce this, since any associated future changes in
   528      * representations will need to be accompanied by algorithmic
   677      * representations will need to be accompanied by algorithmic
   529      * changes anyway. Several methods intrinsically sprawl because
   678      * changes anyway. Several methods intrinsically sprawl because
   530      * they must accumulate sets of consistent reads of volatiles held
   679      * they must accumulate sets of consistent reads of fields held in
   531      * in local variables.  Methods signalWork() and scan() are the
   680      * local variables.  There are also other coding oddities
   532      * main bottlenecks, so are especially heavily
   681      * (including several unnecessary-looking hoisted null checks)
   533      * micro-optimized/mangled.  There are lots of inline assignments
   682      * that help some methods perform reasonably even when interpreted
   534      * (of form "while ((local = field) != 0)") which are usually the
   683      * (not compiled).
   535      * simplest way to ensure the required read orderings (which are
   684      *
   536      * sometimes critical). This leads to a "C"-like style of listing
   685      * The order of declarations in this file is (with a few exceptions):
   537      * declarations of these locals at the heads of methods or blocks.
       
   538      * There are several occurrences of the unusual "do {} while
       
   539      * (!cas...)"  which is the simplest way to force an update of a
       
   540      * CAS'ed variable. There are also other coding oddities (including
       
   541      * several unnecessary-looking hoisted null checks) that help
       
   542      * some methods perform reasonably even when interpreted (not
       
   543      * compiled).
       
   544      *
       
   545      * The order of declarations in this file is:
       
   546      * (1) Static utility functions
   686      * (1) Static utility functions
   547      * (2) Nested (static) classes
   687      * (2) Nested (static) classes
   548      * (3) Static fields
   688      * (3) Static fields
   549      * (4) Fields, along with constants used when unpacking some of them
   689      * (4) Fields, along with constants used when unpacking some of them
   550      * (5) Internal control methods
   690      * (5) Internal control methods
   607         public final Void getRawResult() { return null; }
   747         public final Void getRawResult() { return null; }
   608         public final void setRawResult(Void x) {}
   748         public final void setRawResult(Void x) {}
   609         public final boolean exec() { return true; }
   749         public final boolean exec() { return true; }
   610     }
   750     }
   611 
   751 
       
   752     // Constants shared across ForkJoinPool and WorkQueue
       
   753 
       
   754     // Bounds
       
   755     static final int SMASK        = 0xffff;        // short bits == max index
       
   756     static final int MAX_CAP      = 0x7fff;        // max #workers - 1
       
   757     static final int EVENMASK     = 0xfffe;        // even short bits
       
   758     static final int SQMASK       = 0x007e;        // max 64 (even) slots
       
   759 
       
   760     // Masks and units for WorkQueue.scanState and ctl sp subfield
       
   761     static final int SCANNING     = 1;             // false when running tasks
       
   762     static final int INACTIVE     = 1 << 31;       // must be negative
       
   763     static final int SS_SEQ       = 1 << 16;       // version count
       
   764 
       
   765     // Mode bits for ForkJoinPool.config and WorkQueue.config
       
   766     static final int MODE_MASK    = 0xffff << 16;  // top half of int
       
   767     static final int LIFO_QUEUE   = 0;
       
   768     static final int FIFO_QUEUE   = 1 << 16;
       
   769     static final int SHARED_QUEUE = 1 << 31;       // must be negative
       
   770 
   612     /**
   771     /**
   613      * Queues supporting work-stealing as well as external task
   772      * Queues supporting work-stealing as well as external task
   614      * submission. See above for main rationale and algorithms.
   773      * submission. See above for descriptions and algorithms.
   615      * Implementation relies heavily on "Unsafe" intrinsics
       
   616      * and selective use of "volatile":
       
   617      *
       
   618      * Field "base" is the index (mod array.length) of the least valid
       
   619      * queue slot, which is always the next position to steal (poll)
       
   620      * from if nonempty. Reads and writes require volatile orderings
       
   621      * but not CAS, because updates are only performed after slot
       
   622      * CASes.
       
   623      *
       
   624      * Field "top" is the index (mod array.length) of the next queue
       
   625      * slot to push to or pop from. It is written only by owner thread
       
   626      * for push, or under lock for external/shared push, and accessed
       
   627      * by other threads only after reading (volatile) base.  Both top
       
   628      * and base are allowed to wrap around on overflow, but (top -
       
   629      * base) (or more commonly -(base - top) to force volatile read of
       
   630      * base before top) still estimates size. The lock ("qlock") is
       
   631      * forced to -1 on termination, causing all further lock attempts
       
   632      * to fail. (Note: we don't need CAS for termination state because
       
   633      * upon pool shutdown, all shared-queues will stop being used
       
   634      * anyway.)  Nearly all lock bodies are set up so that exceptions
       
   635      * within lock bodies are "impossible" (modulo JVM errors that
       
   636      * would cause failure anyway.)
       
   637      *
       
   638      * The array slots are read and written using the emulation of
       
   639      * volatiles/atomics provided by Unsafe. Insertions must in
       
   640      * general use putOrderedObject as a form of releasing store to
       
   641      * ensure that all writes to the task object are ordered before
       
   642      * its publication in the queue.  All removals entail a CAS to
       
   643      * null.  The array is always a power of two. To ensure safety of
       
   644      * Unsafe array operations, all accesses perform explicit null
       
   645      * checks and implicit bounds checks via power-of-two masking.
       
   646      *
       
   647      * In addition to basic queuing support, this class contains
       
   648      * fields described elsewhere to control execution. It turns out
       
   649      * to work better memory-layout-wise to include them in this class
       
   650      * rather than a separate class.
       
   651      *
       
   652      * Performance on most platforms is very sensitive to placement of
   774      * Performance on most platforms is very sensitive to placement of
   653      * instances of both WorkQueues and their arrays -- we absolutely
   775      * instances of both WorkQueues and their arrays -- we absolutely
   654      * do not want multiple WorkQueue instances or multiple queue
   776      * do not want multiple WorkQueue instances or multiple queue
   655      * arrays sharing cache lines. (It would be best for queue objects
   777      * arrays sharing cache lines. The @Contended annotation alerts
   656      * and their arrays to share, but there is nothing available to
   778      * JVMs to try to keep instances apart.
   657      * help arrange that). The @Contended annotation alerts JVMs to
       
   658      * try to keep instances apart.
       
   659      */
   779      */
   660     @sun.misc.Contended
   780     @sun.misc.Contended
   661     static final class WorkQueue {
   781     static final class WorkQueue {
       
   782 
   662         /**
   783         /**
   663          * Capacity of work-stealing queue array upon initialization.
   784          * Capacity of work-stealing queue array upon initialization.
   664          * Must be a power of two; at least 4, but should be larger to
   785          * Must be a power of two; at least 4, but should be larger to
   665          * reduce or eliminate cacheline sharing among queues.
   786          * reduce or eliminate cacheline sharing among queues.
   666          * Currently, it is much larger, as a partial workaround for
   787          * Currently, it is much larger, as a partial workaround for
   677          * value a bit less than this to help users trap runaway
   798          * value a bit less than this to help users trap runaway
   678          * programs before saturating systems.
   799          * programs before saturating systems.
   679          */
   800          */
   680         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
   801         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
   681 
   802 
   682         volatile int eventCount;   // encoded inactivation count; < 0 if inactive
   803         // Instance fields
   683         int nextWait;              // encoded record of next event waiter
   804         volatile int scanState;    // versioned, <0: inactive; odd:scanning
       
   805         int stackPred;             // pool stack (ctl) predecessor
   684         int nsteals;               // number of steals
   806         int nsteals;               // number of steals
   685         int hint;                  // steal index hint
   807         int hint;                  // randomization and stealer index hint
   686         short poolIndex;           // index of this queue in pool
   808         int config;                // pool index and mode
   687         final short mode;          // 0: lifo, > 0: fifo, < 0: shared
   809         volatile int qlock;        // 1: locked, < 0: terminate; else 0
   688         volatile int qlock;        // 1: locked, -1: terminate; else 0
       
   689         volatile int base;         // index of next slot for poll
   810         volatile int base;         // index of next slot for poll
   690         int top;                   // index of next slot for push
   811         int top;                   // index of next slot for push
   691         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
   812         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
   692         final ForkJoinPool pool;   // the containing pool (may be null)
   813         final ForkJoinPool pool;   // the containing pool (may be null)
   693         final ForkJoinWorkerThread owner; // owning thread or null if shared
   814         final ForkJoinWorkerThread owner; // owning thread or null if shared
   694         volatile Thread parker;    // == owner during call to park; else null
   815         volatile Thread parker;    // == owner during call to park; else null
   695         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
   816         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
   696         ForkJoinTask<?> currentSteal; // current non-local task being executed
   817         volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
   697 
   818 
   698         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
   819         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
   699                   int seed) {
       
   700             this.pool = pool;
   820             this.pool = pool;
   701             this.owner = owner;
   821             this.owner = owner;
   702             this.mode = (short)mode;
       
   703             this.hint = seed; // store initial seed for runWorker
       
   704             // Place indices in the center of array (that is not yet allocated)
   822             // Place indices in the center of array (that is not yet allocated)
   705             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
   823             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
       
   824         }
       
   825 
       
   826         /**
       
   827          * Returns an exportable index (used by ForkJoinWorkerThread).
       
   828          */
       
   829         final int getPoolIndex() {
       
   830             return (config & 0xffff) >>> 1; // ignore odd/even tag bit
   706         }
   831         }
   707 
   832 
   708         /**
   833         /**
   709          * Returns the approximate number of tasks in the queue.
   834          * Returns the approximate number of tasks in the queue.
   710          */
   835          */
   717          * Provides a more accurate estimate of whether this queue has
   842          * Provides a more accurate estimate of whether this queue has
   718          * any tasks than does queueSize, by checking whether a
   843          * any tasks than does queueSize, by checking whether a
   719          * near-empty queue has at least one unclaimed task.
   844          * near-empty queue has at least one unclaimed task.
   720          */
   845          */
   721         final boolean isEmpty() {
   846         final boolean isEmpty() {
   722             ForkJoinTask<?>[] a; int m, s;
   847             ForkJoinTask<?>[] a; int n, m, s;
   723             int n = base - (s = top);
   848             return ((n = base - (s = top)) >= 0 ||
   724             return (n >= 0 ||
   849                     (n == -1 &&           // possibly one task
   725                     (n == -1 &&
   850                      ((a = array) == null || (m = a.length - 1) < 0 ||
   726                      ((a = array) == null ||
       
   727                       (m = a.length - 1) < 0 ||
       
   728                       U.getObject
   851                       U.getObject
   729                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
   852                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
   730         }
   853         }
   731 
   854 
   732         /**
   855         /**
   736          * @param task the task. Caller must ensure non-null.
   859          * @param task the task. Caller must ensure non-null.
   737          * @throws RejectedExecutionException if array cannot be resized
   860          * @throws RejectedExecutionException if array cannot be resized
   738          */
   861          */
   739         final void push(ForkJoinTask<?> task) {
   862         final void push(ForkJoinTask<?> task) {
   740             ForkJoinTask<?>[] a; ForkJoinPool p;
   863             ForkJoinTask<?>[] a; ForkJoinPool p;
   741             int s = top, n;
   864             int b = base, s = top, n;
   742             if ((a = array) != null) {    // ignore if queue removed
   865             if ((a = array) != null) {    // ignore if queue removed
   743                 int m = a.length - 1;
   866                 int m = a.length - 1;     // fenced write for task visibility
   744                 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
   867                 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
   745                 if ((n = (top = s + 1) - base) <= 2)
   868                 U.putOrderedInt(this, QTOP, s + 1);
   746                     (p = pool).signalWork(p.workQueues, this);
   869                 if ((n = s - b) <= 1) {
       
   870                     if ((p = pool) != null)
       
   871                         p.signalWork(p.workQueues, this);
       
   872                 }
   747                 else if (n >= m)
   873                 else if (n >= m)
   748                     growArray();
   874                     growArray();
   749             }
   875             }
   750         }
   876         }
   751 
   877 
   762             int oldMask, t, b;
   888             int oldMask, t, b;
   763             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
   889             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
   764             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
   890             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
   765                 (t = top) - (b = base) > 0) {
   891                 (t = top) - (b = base) > 0) {
   766                 int mask = size - 1;
   892                 int mask = size - 1;
   767                 do {
   893                 do { // emulate poll from old array, push to new array
   768                     ForkJoinTask<?> x;
   894                     ForkJoinTask<?> x;
   769                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
   895                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
   770                     int j    = ((b &    mask) << ASHIFT) + ABASE;
   896                     int j    = ((b &    mask) << ASHIFT) + ABASE;
   771                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
   897                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
   772                     if (x != null &&
   898                     if (x != null &&
   787                 for (int s; (s = top - 1) - base >= 0;) {
   913                 for (int s; (s = top - 1) - base >= 0;) {
   788                     long j = ((m & s) << ASHIFT) + ABASE;
   914                     long j = ((m & s) << ASHIFT) + ABASE;
   789                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
   915                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
   790                         break;
   916                         break;
   791                     if (U.compareAndSwapObject(a, j, t, null)) {
   917                     if (U.compareAndSwapObject(a, j, t, null)) {
   792                         top = s;
   918                         U.putOrderedInt(this, QTOP, s);
   793                         return t;
   919                         return t;
   794                     }
   920                     }
   795                 }
   921                 }
   796             }
   922             }
   797             return null;
   923             return null;
   798         }
   924         }
   799 
   925 
   800         /**
   926         /**
   801          * Takes a task in FIFO order if b is base of queue and a task
   927          * Takes a task in FIFO order if b is base of queue and a task
   802          * can be claimed without contention. Specialized versions
   928          * can be claimed without contention. Specialized versions
   803          * appear in ForkJoinPool methods scan and tryHelpStealer.
   929          * appear in ForkJoinPool methods scan and helpStealer.
   804          */
   930          */
   805         final ForkJoinTask<?> pollAt(int b) {
   931         final ForkJoinTask<?> pollAt(int b) {
   806             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
   932             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
   807             if ((a = array) != null) {
   933             if ((a = array) != null) {
   808                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
   934                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
   809                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
   935                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
   810                     base == b && U.compareAndSwapObject(a, j, t, null)) {
   936                     base == b && U.compareAndSwapObject(a, j, t, null)) {
   811                     U.putOrderedInt(this, QBASE, b + 1);
   937                     base = b + 1;
   812                     return t;
   938                     return t;
   813                 }
   939                 }
   814             }
   940             }
   815             return null;
   941             return null;
   816         }
   942         }
   821         final ForkJoinTask<?> poll() {
   947         final ForkJoinTask<?> poll() {
   822             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
   948             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
   823             while ((b = base) - top < 0 && (a = array) != null) {
   949             while ((b = base) - top < 0 && (a = array) != null) {
   824                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
   950                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
   825                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
   951                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
   826                 if (t != null) {
   952                 if (base == b) {
   827                     if (U.compareAndSwapObject(a, j, t, null)) {
   953                     if (t != null) {
   828                         U.putOrderedInt(this, QBASE, b + 1);
   954                         if (U.compareAndSwapObject(a, j, t, null)) {
   829                         return t;
   955                             base = b + 1;
       
   956                             return t;
       
   957                         }
   830                     }
   958                     }
   831                 }
   959                     else if (b + 1 == top) // now empty
   832                 else if (base == b) {
       
   833                     if (b + 1 == top)
       
   834                         break;
   960                         break;
   835                     Thread.yield(); // wait for lagging update (very rare)
       
   836                 }
   961                 }
   837             }
   962             }
   838             return null;
   963             return null;
   839         }
   964         }
   840 
   965 
   841         /**
   966         /**
   842          * Takes next task, if one exists, in order specified by mode.
   967          * Takes next task, if one exists, in order specified by mode.
   843          */
   968          */
   844         final ForkJoinTask<?> nextLocalTask() {
   969         final ForkJoinTask<?> nextLocalTask() {
   845             return mode == 0 ? pop() : poll();
   970             return (config & FIFO_QUEUE) == 0 ? pop() : poll();
   846         }
   971         }
   847 
   972 
   848         /**
   973         /**
   849          * Returns next task, if one exists, in order specified by mode.
   974          * Returns next task, if one exists, in order specified by mode.
   850          */
   975          */
   851         final ForkJoinTask<?> peek() {
   976         final ForkJoinTask<?> peek() {
   852             ForkJoinTask<?>[] a = array; int m;
   977             ForkJoinTask<?>[] a = array; int m;
   853             if (a == null || (m = a.length - 1) < 0)
   978             if (a == null || (m = a.length - 1) < 0)
   854                 return null;
   979                 return null;
   855             int i = mode == 0 ? top - 1 : base;
   980             int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
   856             int j = ((i & m) << ASHIFT) + ABASE;
   981             int j = ((i & m) << ASHIFT) + ABASE;
   857             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
   982             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
   858         }
   983         }
   859 
   984 
   860         /**
   985         /**
   861          * Pops the given task only if it is at the current top.
   986          * Pops the given task only if it is at the current top.
   862          * (A shared version is available only via FJP.tryExternalUnpush)
   987          * (A shared version is available only via FJP.tryExternalUnpush)
   863          */
   988         */
   864         final boolean tryUnpush(ForkJoinTask<?> t) {
   989         final boolean tryUnpush(ForkJoinTask<?> t) {
   865             ForkJoinTask<?>[] a; int s;
   990             ForkJoinTask<?>[] a; int s;
   866             if ((a = array) != null && (s = top) != base &&
   991             if ((a = array) != null && (s = top) != base &&
   867                 U.compareAndSwapObject
   992                 U.compareAndSwapObject
   868                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
   993                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
   869                 top = s;
   994                 U.putOrderedInt(this, QTOP, s);
   870                 return true;
   995                 return true;
   871             }
   996             }
   872             return false;
   997             return false;
   873         }
   998         }
   874 
   999 
   875         /**
  1000         /**
   876          * Removes and cancels all known tasks, ignoring any exceptions.
  1001          * Removes and cancels all known tasks, ignoring any exceptions.
   877          */
  1002          */
   878         final void cancelAll() {
  1003         final void cancelAll() {
   879             ForkJoinTask.cancelIgnoringExceptions(currentJoin);
  1004             ForkJoinTask<?> t;
   880             ForkJoinTask.cancelIgnoringExceptions(currentSteal);
  1005             if ((t = currentJoin) != null) {
   881             for (ForkJoinTask<?> t; (t = poll()) != null; )
  1006                 currentJoin = null;
       
  1007                 ForkJoinTask.cancelIgnoringExceptions(t);
       
  1008             }
       
  1009             if ((t = currentSteal) != null) {
       
  1010                 currentSteal = null;
       
  1011                 ForkJoinTask.cancelIgnoringExceptions(t);
       
  1012             }
       
  1013             while ((t = poll()) != null)
   882                 ForkJoinTask.cancelIgnoringExceptions(t);
  1014                 ForkJoinTask.cancelIgnoringExceptions(t);
   883         }
  1015         }
   884 
  1016 
   885         // Specialized execution methods
  1017         // Specialized execution methods
   886 
  1018 
   891             for (ForkJoinTask<?> t; (t = poll()) != null;)
  1023             for (ForkJoinTask<?> t; (t = poll()) != null;)
   892                 t.doExec();
  1024                 t.doExec();
   893         }
  1025         }
   894 
  1026 
   895         /**
  1027         /**
   896          * Executes a top-level task and any local tasks remaining
  1028          * Removes and executes all local tasks. If LIFO, invokes
   897          * after execution.
  1029          * pollAndExecAll. Otherwise implements a specialized pop loop
       
  1030          * to exec until empty.
   898          */
  1031          */
   899         final void runTask(ForkJoinTask<?> task) {
  1032         final void execLocalTasks() {
   900             if ((currentSteal = task) != null) {
  1033             int b = base, m, s;
   901                 ForkJoinWorkerThread thread;
  1034             ForkJoinTask<?>[] a = array;
   902                 task.doExec();
  1035             if (b - (s = top - 1) <= 0 && a != null &&
   903                 ForkJoinTask<?>[] a = array;
  1036                 (m = a.length - 1) >= 0) {
   904                 int md = mode;
  1037                 if ((config & FIFO_QUEUE) == 0) {
   905                 ++nsteals;
  1038                     for (ForkJoinTask<?> t;;) {
   906                 currentSteal = null;
  1039                         if ((t = (ForkJoinTask<?>)U.getAndSetObject
   907                 if (md != 0)
  1040                              (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
   908                     pollAndExecAll();
  1041                             break;
   909                 else if (a != null) {
  1042                         U.putOrderedInt(this, QTOP, s);
   910                     int s, m = a.length - 1;
       
   911                     ForkJoinTask<?> t;
       
   912                     while ((s = top - 1) - base >= 0 &&
       
   913                            (t = (ForkJoinTask<?>)U.getAndSetObject
       
   914                             (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
       
   915                         top = s;
       
   916                         t.doExec();
  1043                         t.doExec();
   917                     }
  1044                         if (base - (s = top - 1) > 0)
   918                 }
       
   919                 if ((thread = owner) != null) // no need to do in finally clause
       
   920                     thread.afterTopLevelExec();
       
   921             }
       
   922         }
       
   923 
       
   924         /**
       
   925          * If present, removes from queue and executes the given task,
       
   926          * or any other cancelled task. Returns (true) on any CAS
       
   927          * or consistency check failure so caller can retry.
       
   928          *
       
   929          * @return false if no progress can be made, else true
       
   930          */
       
   931         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
       
   932             boolean stat;
       
   933             ForkJoinTask<?>[] a; int m, s, b, n;
       
   934             if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
       
   935                 (n = (s = top) - (b = base)) > 0) {
       
   936                 boolean removed = false, empty = true;
       
   937                 stat = true;
       
   938                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
       
   939                     long j = ((--s & m) << ASHIFT) + ABASE;
       
   940                     t = (ForkJoinTask<?>)U.getObject(a, j);
       
   941                     if (t == null)                    // inconsistent length
       
   942                         break;
       
   943                     else if (t == task) {
       
   944                         if (s + 1 == top) {           // pop
       
   945                             if (!U.compareAndSwapObject(a, j, task, null))
       
   946                                 break;
       
   947                             top = s;
       
   948                             removed = true;
       
   949                         }
       
   950                         else if (base == b)           // replace with proxy
       
   951                             removed = U.compareAndSwapObject(a, j, task,
       
   952                                                              new EmptyTask());
       
   953                         break;
       
   954                     }
       
   955                     else if (t.status >= 0)
       
   956                         empty = false;
       
   957                     else if (s + 1 == top) {          // pop and throw away
       
   958                         if (U.compareAndSwapObject(a, j, t, null))
       
   959                             top = s;
       
   960                         break;
       
   961                     }
       
   962                     if (--n == 0) {
       
   963                         if (!empty && base == b)
       
   964                             stat = false;
       
   965                         break;
       
   966                     }
       
   967                 }
       
   968                 if (removed)
       
   969                     task.doExec();
       
   970             }
       
   971             else
       
   972                 stat = false;
       
   973             return stat;
       
   974         }
       
   975 
       
   976         /**
       
   977          * Tries to poll for and execute the given task or any other
       
   978          * task in its CountedCompleter computation.
       
   979          */
       
   980         final boolean pollAndExecCC(CountedCompleter<?> root) {
       
   981             ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
       
   982             if ((b = base) - top < 0 && (a = array) != null) {
       
   983                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
       
   984                 if ((o = U.getObjectVolatile(a, j)) == null)
       
   985                     return true; // retry
       
   986                 if (o instanceof CountedCompleter) {
       
   987                     for (t = (CountedCompleter<?>)o, r = t;;) {
       
   988                         if (r == root) {
       
   989                             if (base == b &&
       
   990                                 U.compareAndSwapObject(a, j, t, null)) {
       
   991                                 U.putOrderedInt(this, QBASE, b + 1);
       
   992                                 t.doExec();
       
   993                             }
       
   994                             return true;
       
   995                         }
       
   996                         else if ((r = r.completer) == null)
       
   997                             break; // not part of root computation
       
   998                     }
       
   999                 }
       
  1000             }
       
  1001             return false;
       
  1002         }
       
  1003 
       
  1004         /**
       
  1005          * Tries to pop and execute the given task or any other task
       
  1006          * in its CountedCompleter computation.
       
  1007          */
       
  1008         final boolean externalPopAndExecCC(CountedCompleter<?> root) {
       
  1009             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
       
  1010             if (base - (s = top) < 0 && (a = array) != null) {
       
  1011                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
       
  1012                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
       
  1013                     for (t = (CountedCompleter<?>)o, r = t;;) {
       
  1014                         if (r == root) {
       
  1015                             if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
       
  1016                                 if (top == s && array == a &&
       
  1017                                     U.compareAndSwapObject(a, j, t, null)) {
       
  1018                                     top = s - 1;
       
  1019                                     qlock = 0;
       
  1020                                     t.doExec();
       
  1021                                 }
       
  1022                                 else
       
  1023                                     qlock = 0;
       
  1024                             }
       
  1025                             return true;
       
  1026                         }
       
  1027                         else if ((r = r.completer) == null)
       
  1028                             break;
  1045                             break;
  1029                     }
  1046                     }
  1030                 }
  1047                 }
  1031             }
  1048                 else
  1032             return false;
  1049                     pollAndExecAll();
       
  1050             }
  1033         }
  1051         }
  1034 
  1052 
  1035         /**
  1053         /**
  1036          * Internal version
  1054          * Executes the given task and any remaining local tasks.
  1037          */
  1055          */
  1038         final boolean internalPopAndExecCC(CountedCompleter<?> root) {
  1056         final void runTask(ForkJoinTask<?> task) {
  1039             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
  1057             if (task != null) {
       
  1058                 scanState &= ~SCANNING; // mark as busy
       
  1059                 (currentSteal = task).doExec();
       
  1060                 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
       
  1061                 execLocalTasks();
       
  1062                 ForkJoinWorkerThread thread = owner;
       
  1063                 if (++nsteals < 0)      // collect on overflow
       
  1064                     transferStealCount(pool);
       
  1065                 scanState |= SCANNING;
       
  1066                 if (thread != null)
       
  1067                     thread.afterTopLevelExec();
       
  1068             }
       
  1069         }
       
  1070 
       
  1071         /**
       
  1072          * Adds steal count to pool stealCounter if it exists, and resets.
       
  1073          */
       
  1074         final void transferStealCount(ForkJoinPool p) {
       
  1075             AtomicLong sc;
       
  1076             if (p != null && (sc = p.stealCounter) != null) {
       
  1077                 int s = nsteals;
       
  1078                 nsteals = 0;            // if negative, correct for overflow
       
  1079                 sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
       
  1080             }
       
  1081         }
       
  1082 
       
  1083         /**
       
  1084          * If present, removes from queue and executes the given task,
       
  1085          * or any other cancelled task. Used only by awaitJoin.
       
  1086          *
       
  1087          * @return true if queue empty and task not known to be done
       
  1088          */
       
  1089         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
       
  1090             ForkJoinTask<?>[] a; int m, s, b, n;
       
  1091             if ((a = array) != null && (m = a.length - 1) >= 0 &&
       
  1092                 task != null) {
       
  1093                 while ((n = (s = top) - (b = base)) > 0) {
       
  1094                     for (ForkJoinTask<?> t;;) {      // traverse from s to b
       
  1095                         long j = ((--s & m) << ASHIFT) + ABASE;
       
  1096                         if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
       
  1097                             return s + 1 == top;     // shorter than expected
       
  1098                         else if (t == task) {
       
  1099                             boolean removed = false;
       
  1100                             if (s + 1 == top) {      // pop
       
  1101                                 if (U.compareAndSwapObject(a, j, task, null)) {
       
  1102                                     U.putOrderedInt(this, QTOP, s);
       
  1103                                     removed = true;
       
  1104                                 }
       
  1105                             }
       
  1106                             else if (base == b)      // replace with proxy
       
  1107                                 removed = U.compareAndSwapObject(
       
  1108                                     a, j, task, new EmptyTask());
       
  1109                             if (removed)
       
  1110                                 task.doExec();
       
  1111                             break;
       
  1112                         }
       
  1113                         else if (t.status < 0 && s + 1 == top) {
       
  1114                             if (U.compareAndSwapObject(a, j, t, null))
       
  1115                                 U.putOrderedInt(this, QTOP, s);
       
  1116                             break;                  // was cancelled
       
  1117                         }
       
  1118                         if (--n == 0)
       
  1119                             return false;
       
  1120                     }
       
  1121                     if (task.status < 0)
       
  1122                         return false;
       
  1123                 }
       
  1124             }
       
  1125             return true;
       
  1126         }
       
  1127 
       
  1128         /**
       
  1129          * Pops task if in the same CC computation as the given task,
       
  1130          * in either shared or owned mode. Used only by helpComplete.
       
  1131          */
       
  1132         final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
       
  1133             int s; ForkJoinTask<?>[] a; Object o;
  1040             if (base - (s = top) < 0 && (a = array) != null) {
  1134             if (base - (s = top) < 0 && (a = array) != null) {
  1041                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
  1135                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
  1042                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
  1136                 if ((o = U.getObjectVolatile(a, j)) != null &&
  1043                     for (t = (CountedCompleter<?>)o, r = t;;) {
  1137                     (o instanceof CountedCompleter)) {
  1044                         if (r == root) {
  1138                     CountedCompleter<?> t = (CountedCompleter<?>)o;
  1045                             if (U.compareAndSwapObject(a, j, t, null)) {
  1139                     for (CountedCompleter<?> r = t;;) {
  1046                                 top = s - 1;
  1140                         if (r == task) {
  1047                                 t.doExec();
  1141                             if (mode < 0) { // must lock
       
  1142                                 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
       
  1143                                     if (top == s && array == a &&
       
  1144                                         U.compareAndSwapObject(a, j, t, null)) {
       
  1145                                         U.putOrderedInt(this, QTOP, s - 1);
       
  1146                                         U.putOrderedInt(this, QLOCK, 0);
       
  1147                                         return t;
       
  1148                                     }
       
  1149                                     U.compareAndSwapInt(this, QLOCK, 1, 0);
       
  1150                                 }
  1048                             }
  1151                             }
  1049                             return true;
  1152                             else if (U.compareAndSwapObject(a, j, t, null)) {
       
  1153                                 U.putOrderedInt(this, QTOP, s - 1);
       
  1154                                 return t;
       
  1155                             }
       
  1156                             break;
  1050                         }
  1157                         }
  1051                         else if ((r = r.completer) == null)
  1158                         else if ((r = r.completer) == null) // try parent
  1052                             break;
  1159                             break;
  1053                     }
  1160                     }
  1054                 }
  1161                 }
  1055             }
  1162             }
  1056             return false;
  1163             return null;
       
  1164         }
       
  1165 
       
  1166         /**
       
  1167          * Steals and runs a task in the same CC computation as the
       
  1168          * given task if one exists and can be taken without
       
  1169          * contention. Otherwise returns a checksum/control value for
       
  1170          * use by method helpComplete.
       
  1171          *
       
  1172          * @return 1 if successful, 2 if retryable (lost to another
       
  1173          * stealer), -1 if non-empty but no matching task found, else
       
  1174          * the base index, forced negative.
       
  1175          */
       
  1176         final int pollAndExecCC(CountedCompleter<?> task) {
       
  1177             int b, h; ForkJoinTask<?>[] a; Object o;
       
  1178             if ((b = base) - top >= 0 || (a = array) == null)
       
  1179                 h = b | Integer.MIN_VALUE;  // to sense movement on re-poll
       
  1180             else {
       
  1181                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
       
  1182                 if ((o = U.getObjectVolatile(a, j)) == null)
       
  1183                     h = 2;                  // retryable
       
  1184                 else if (!(o instanceof CountedCompleter))
       
  1185                     h = -1;                 // unmatchable
       
  1186                 else {
       
  1187                     CountedCompleter<?> t = (CountedCompleter<?>)o;
       
  1188                     for (CountedCompleter<?> r = t;;) {
       
  1189                         if (r == task) {
       
  1190                             if (base == b &&
       
  1191                                 U.compareAndSwapObject(a, j, t, null)) {
       
  1192                                 base = b + 1;
       
  1193                                 t.doExec();
       
  1194                                 h = 1;      // success
       
  1195                             }
       
  1196                             else
       
  1197                                 h = 2;      // lost CAS
       
  1198                             break;
       
  1199                         }
       
  1200                         else if ((r = r.completer) == null) {
       
  1201                             h = -1;         // unmatched
       
  1202                             break;
       
  1203                         }
       
  1204                     }
       
  1205                 }
       
  1206             }
       
  1207             return h;
  1057         }
  1208         }
  1058 
  1209 
  1059         /**
  1210         /**
  1060          * Returns true if owned and not known to be blocked.
  1211          * Returns true if owned and not known to be blocked.
  1061          */
  1212          */
  1062         final boolean isApparentlyUnblocked() {
  1213         final boolean isApparentlyUnblocked() {
  1063             Thread wt; Thread.State s;
  1214             Thread wt; Thread.State s;
  1064             return (eventCount >= 0 &&
  1215             return (scanState >= 0 &&
  1065                     (wt = owner) != null &&
  1216                     (wt = owner) != null &&
  1066                     (s = wt.getState()) != Thread.State.BLOCKED &&
  1217                     (s = wt.getState()) != Thread.State.BLOCKED &&
  1067                     s != Thread.State.WAITING &&
  1218                     s != Thread.State.WAITING &&
  1068                     s != Thread.State.TIMED_WAITING);
  1219                     s != Thread.State.TIMED_WAITING);
  1069         }
  1220         }
  1070 
  1221 
  1071         // Unsafe mechanics
  1222         // Unsafe mechanics. Note that some are (and must be) the same as in FJP
  1072         private static final sun.misc.Unsafe U;
  1223         private static final sun.misc.Unsafe U;
  1073         private static final long QBASE;
  1224         private static final int  ABASE;
       
  1225         private static final int  ASHIFT;
       
  1226         private static final long QTOP;
  1074         private static final long QLOCK;
  1227         private static final long QLOCK;
  1075         private static final int ABASE;
  1228         private static final long QCURRENTSTEAL;
  1076         private static final int ASHIFT;
       
  1077         static {
  1229         static {
  1078             try {
  1230             try {
  1079                 U = sun.misc.Unsafe.getUnsafe();
  1231                 U = sun.misc.Unsafe.getUnsafe();
  1080                 Class<?> k = WorkQueue.class;
  1232                 Class<?> wk = WorkQueue.class;
  1081                 Class<?> ak = ForkJoinTask[].class;
  1233                 Class<?> ak = ForkJoinTask[].class;
  1082                 QBASE = U.objectFieldOffset
  1234                 QTOP = U.objectFieldOffset
  1083                     (k.getDeclaredField("base"));
  1235                     (wk.getDeclaredField("top"));
  1084                 QLOCK = U.objectFieldOffset
  1236                 QLOCK = U.objectFieldOffset
  1085                     (k.getDeclaredField("qlock"));
  1237                     (wk.getDeclaredField("qlock"));
       
  1238                 QCURRENTSTEAL = U.objectFieldOffset
       
  1239                     (wk.getDeclaredField("currentSteal"));
  1086                 ABASE = U.arrayBaseOffset(ak);
  1240                 ABASE = U.arrayBaseOffset(ak);
  1087                 int scale = U.arrayIndexScale(ak);
  1241                 int scale = U.arrayIndexScale(ak);
  1088                 if ((scale & (scale - 1)) != 0)
  1242                 if ((scale & (scale - 1)) != 0)
  1089                     throw new Error("data type scale not a power of two");
  1243                     throw new Error("data type scale not a power of two");
  1090                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  1244                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  1124      * parallelism as 1 to reflect resulting caller-runs mechanics.
  1278      * parallelism as 1 to reflect resulting caller-runs mechanics.
  1125      */
  1279      */
  1126     static final int commonParallelism;
  1280     static final int commonParallelism;
  1127 
  1281 
  1128     /**
  1282     /**
       
  1283      * Limit on spare thread construction in tryCompensate.
       
  1284      */
       
  1285     private static int commonMaxSpares;
       
  1286 
       
  1287     /**
  1129      * Sequence number for creating workerNamePrefix.
  1288      * Sequence number for creating workerNamePrefix.
  1130      */
  1289      */
  1131     private static int poolNumberSequence;
  1290     private static int poolNumberSequence;
  1132 
  1291 
  1133     /**
  1292     /**
  1136      */
  1295      */
  1137     private static final synchronized int nextPoolId() {
  1296     private static final synchronized int nextPoolId() {
  1138         return ++poolNumberSequence;
  1297         return ++poolNumberSequence;
  1139     }
  1298     }
  1140 
  1299 
  1141     // static constants
  1300     // static configuration constants
  1142 
  1301 
  1143     /**
  1302     /**
  1144      * Initial timeout value (in nanoseconds) for the thread
  1303      * Initial timeout value (in nanoseconds) for the thread
  1145      * triggering quiescence to park waiting for new work. On timeout,
  1304      * triggering quiescence to park waiting for new work. On timeout,
  1146      * the thread will instead try to shrink the number of
  1305      * the thread will instead try to shrink the number of
  1147      * workers. The value should be large enough to avoid overly
  1306      * workers. The value should be large enough to avoid overly
  1148      * aggressive shrinkage during most transient stalls (long GCs
  1307      * aggressive shrinkage during most transient stalls (long GCs
  1149      * etc).
  1308      * etc).
  1150      */
  1309      */
  1151     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
  1310     private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
  1152 
       
  1153     /**
       
  1154      * Timeout value when there are more threads than parallelism level
       
  1155      */
       
  1156     private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
       
  1157 
  1311 
  1158     /**
  1312     /**
  1159      * Tolerance for idle timeouts, to cope with timer undershoots
  1313      * Tolerance for idle timeouts, to cope with timer undershoots
  1160      */
  1314      */
  1161     private static final long TIMEOUT_SLOP = 2000000L;
  1315     private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
  1162 
  1316 
  1163     /**
  1317     /**
  1164      * The maximum stolen->joining link depth allowed in method
  1318      * The initial value for commonMaxSpares during static
  1165      * tryHelpStealer.  Must be a power of two.  Depths for legitimate
  1319      * initialization unless overridden using System property
  1166      * chains are unbounded, but we use a fixed constant to avoid
  1320      * "java.util.concurrent.ForkJoinPool.common.maximumSpares".  The
  1167      * (otherwise unchecked) cycles and to bound staleness of
  1321      * default value is far in excess of normal requirements, but also
  1168      * traversal parameters at the expense of sometimes blocking when
  1322      * far short of MAX_CAP and typical OS thread limits, so allows
  1169      * we could be helping.
  1323      * JVMs to catch misuse/abuse before running out of resources
  1170      */
  1324      * needed to do so.
  1171     private static final int MAX_HELP = 64;
  1325      */
       
  1326     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
       
  1327 
       
  1328     /**
       
  1329      * Number of times to spin-wait before blocking. The spins (in
       
  1330      * awaitRunStateLock and awaitWork) currently use randomized
       
  1331      * spins. If/when MWAIT-like intrinsics becomes available, they
       
  1332      * may allow quieter spinning. The value of SPINS must be a power
       
  1333      * of two, at least 4. The current value causes spinning for a
       
  1334      * small fraction of typical context-switch times, well worthwhile
       
  1335      * given the typical likelihoods that blocking is not necessary.
       
  1336      */
       
  1337     private static final int SPINS  = 1 << 11;
  1172 
  1338 
  1173     /**
  1339     /**
  1174      * Increment for seed generators. See class ThreadLocal for
  1340      * Increment for seed generators. See class ThreadLocal for
  1175      * explanation.
  1341      * explanation.
  1176      */
  1342      */
  1177     private static final int SEED_INCREMENT = 0x9e3779b9;
  1343     private static final int SEED_INCREMENT = 0x9e3779b9;
  1178 
  1344 
  1179     /*
  1345     /*
  1180      * Bits and masks for control variables
  1346      * Bits and masks for field ctl, packed with 4 16 bit subfields:
  1181      *
  1347      * AC: Number of active running workers minus target parallelism
  1182      * Field ctl is a long packed with:
  1348      * TC: Number of total workers minus target parallelism
  1183      * AC: Number of active running workers minus target parallelism (16 bits)
  1349      * SS: version count and status of top waiting thread
  1184      * TC: Number of total workers minus target parallelism (16 bits)
  1350      * ID: poolIndex of top of Treiber stack of waiters
  1185      * ST: true if pool is terminating (1 bit)
  1351      *
  1186      * EC: the wait count of top waiting thread (15 bits)
  1352      * When convenient, we can extract the lower 32 stack top bits
  1187      * ID: poolIndex of top of Treiber stack of waiters (16 bits)
  1353      * (including version bits) as sp=(int)ctl.  The offsets of counts
  1188      *
  1354      * by the target parallelism and the positionings of fields makes
  1189      * When convenient, we can extract the upper 32 bits of counts and
  1355      * it possible to perform the most common checks via sign tests of
  1190      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
  1356      * fields: When ac is negative, there are not enough active
  1191      * (int)ctl.  The ec field is never accessed alone, but always
  1357      * workers, when tc is negative, there are not enough total
  1192      * together with id and st. The offsets of counts by the target
  1358      * workers.  When sp is non-zero, there are waiting workers.  To
  1193      * parallelism and the positionings of fields makes it possible to
  1359      * deal with possibly negative fields, we use casts in and out of
  1194      * perform the most common checks via sign tests of fields: When
  1360      * "short" and/or signed shifts to maintain signedness.
  1195      * ac is negative, there are not enough active workers, when tc is
  1361      *
  1196      * negative, there are not enough total workers, and when e is
  1362      * Because it occupies uppermost bits, we can add one active count
  1197      * negative, the pool is terminating.  To deal with these possibly
  1363      * using getAndAddLong of AC_UNIT, rather than CAS, when returning
  1198      * negative fields, we use casts in and out of "short" and/or
  1364      * from a blocked join.  Other updates entail multiple subfields
  1199      * signed shifts to maintain signedness.
  1365      * and masking, requiring CAS.
  1200      *
  1366      */
  1201      * When a thread is queued (inactivated), its eventCount field is
  1367 
  1202      * set negative, which is the only way to tell if a worker is
  1368     // Lower and upper word masks
  1203      * prevented from executing tasks, even though it must continue to
  1369     private static final long SP_MASK    = 0xffffffffL;
  1204      * scan for them to avoid queuing races. Note however that
  1370     private static final long UC_MASK    = ~SP_MASK;
  1205      * eventCount updates lag releases so usage requires care.
  1371 
  1206      *
  1372     // Active counts
  1207      * Field plock is an int packed with:
       
  1208      * SHUTDOWN: true if shutdown is enabled (1 bit)
       
  1209      * SEQ:  a sequence lock, with PL_LOCK bit set if locked (30 bits)
       
  1210      * SIGNAL: set when threads may be waiting on the lock (1 bit)
       
  1211      *
       
  1212      * The sequence number enables simple consistency checks:
       
  1213      * Staleness of read-only operations on the workQueues array can
       
  1214      * be checked by comparing plock before vs after the reads.
       
  1215      */
       
  1216 
       
  1217     // bit positions/shifts for fields
       
  1218     private static final int  AC_SHIFT   = 48;
  1373     private static final int  AC_SHIFT   = 48;
       
  1374     private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
       
  1375     private static final long AC_MASK    = 0xffffL << AC_SHIFT;
       
  1376 
       
  1377     // Total counts
  1219     private static final int  TC_SHIFT   = 32;
  1378     private static final int  TC_SHIFT   = 32;
  1220     private static final int  ST_SHIFT   = 31;
  1379     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
  1221     private static final int  EC_SHIFT   = 16;
  1380     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
  1222 
  1381     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
  1223     // bounds
  1382 
  1224     private static final int  SMASK      = 0xffff;  // short bits
  1383     // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
  1225     private static final int  MAX_CAP    = 0x7fff;  // max #workers - 1
  1384     private static final int  RSLOCK     = 1;
  1226     private static final int  EVENMASK   = 0xfffe;  // even short bits
  1385     private static final int  RSIGNAL    = 1 << 1;
  1227     private static final int  SQMASK     = 0x007e;  // max 64 (even) slots
  1386     private static final int  STARTED    = 1 << 2;
  1228     private static final int  SHORT_SIGN = 1 << 15;
  1387     private static final int  STOP       = 1 << 29;
  1229     private static final int  INT_SIGN   = 1 << 31;
  1388     private static final int  TERMINATED = 1 << 30;
  1230 
  1389     private static final int  SHUTDOWN   = 1 << 31;
  1231     // masks
       
  1232     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
       
  1233     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
       
  1234     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
       
  1235 
       
  1236     // units for incrementing and decrementing
       
  1237     private static final long TC_UNIT    = 1L << TC_SHIFT;
       
  1238     private static final long AC_UNIT    = 1L << AC_SHIFT;
       
  1239 
       
  1240     // masks and units for dealing with u = (int)(ctl >>> 32)
       
  1241     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
       
  1242     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
       
  1243     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
       
  1244     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
       
  1245     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
       
  1246     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
       
  1247 
       
  1248     // masks and units for dealing with e = (int)ctl
       
  1249     private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
       
  1250     private static final int E_SEQ       = 1 << EC_SHIFT;
       
  1251 
       
  1252     // plock bits
       
  1253     private static final int SHUTDOWN    = 1 << 31;
       
  1254     private static final int PL_LOCK     = 2;
       
  1255     private static final int PL_SIGNAL   = 1;
       
  1256     private static final int PL_SPINS    = 1 << 8;
       
  1257 
       
  1258     // access mode for WorkQueue
       
  1259     static final int LIFO_QUEUE          =  0;
       
  1260     static final int FIFO_QUEUE          =  1;
       
  1261     static final int SHARED_QUEUE        = -1;
       
  1262 
  1390 
  1263     // Instance fields
  1391     // Instance fields
  1264     volatile long stealCount;                  // collects worker counts
  1392     volatile long ctl;                   // main pool control
  1265     volatile long ctl;                         // main pool control
  1393     volatile int runState;               // lockable status
  1266     volatile int plock;                        // shutdown status and seqLock
  1394     final int config;                    // parallelism, mode
  1267     volatile int indexSeed;                    // worker/submitter index seed
  1395     int indexSeed;                       // to generate worker index
  1268     final short parallelism;                   // parallelism level
  1396     volatile WorkQueue[] workQueues;     // main registry
  1269     final short mode;                          // LIFO/FIFO
       
  1270     WorkQueue[] workQueues;                    // main registry
       
  1271     final ForkJoinWorkerThreadFactory factory;
  1397     final ForkJoinWorkerThreadFactory factory;
  1272     final UncaughtExceptionHandler ueh;        // per-worker UEH
  1398     final UncaughtExceptionHandler ueh;  // per-worker UEH
  1273     final String workerNamePrefix;             // to create worker name string
  1399     final String workerNamePrefix;       // to create worker name string
  1274 
  1400     volatile AtomicLong stealCounter;    // also used as sync monitor
  1275     /**
  1401 
  1276      * Acquires the plock lock to protect worker array and related
  1402     /**
  1277      * updates. This method is called only if an initial CAS on plock
  1403      * Acquires the runState lock; returns current (locked) runState.
  1278      * fails. This acts as a spinlock for normal cases, but falls back
  1404      */
  1279      * to builtin monitor to block when (rarely) needed. This would be
  1405     private int lockRunState() {
  1280      * a terrible idea for a highly contended lock, but works fine as
  1406         int rs;
  1281      * a more conservative alternative to a pure spinlock.
  1407         return ((((rs = runState) & RSLOCK) != 0 ||
  1282      */
  1408                  !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
  1283     private int acquirePlock() {
  1409                 awaitRunStateLock() : rs);
  1284         int spins = PL_SPINS, ps, nps;
  1410     }
  1285         for (;;) {
  1411 
  1286             if (((ps = plock) & PL_LOCK) == 0 &&
  1412     /**
  1287                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
  1413      * Spins and/or blocks until runstate lock is available.  See
  1288                 return nps;
  1414      * above for explanation.
  1289             else if (spins >= 0) {
  1415      */
  1290                 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
  1416     private int awaitRunStateLock() {
       
  1417         Object lock;
       
  1418         boolean wasInterrupted = false;
       
  1419         for (int spins = SPINS, r = 0, rs, ns;;) {
       
  1420             if (((rs = runState) & RSLOCK) == 0) {
       
  1421                 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
       
  1422                     if (wasInterrupted) {
       
  1423                         try {
       
  1424                             Thread.currentThread().interrupt();
       
  1425                         } catch (SecurityException ignore) {
       
  1426                         }
       
  1427                     }
       
  1428                     return ns;
       
  1429                 }
       
  1430             }
       
  1431             else if (r == 0)
       
  1432                 r = ThreadLocalRandom.nextSecondarySeed();
       
  1433             else if (spins > 0) {
       
  1434                 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
       
  1435                 if (r >= 0)
  1291                     --spins;
  1436                     --spins;
  1292             }
  1437             }
  1293             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
  1438             else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
  1294                 synchronized (this) {
  1439                 Thread.yield();   // initialization race
  1295                     if ((plock & PL_SIGNAL) != 0) {
  1440             else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
       
  1441                 synchronized (lock) {
       
  1442                     if ((runState & RSIGNAL) != 0) {
  1296                         try {
  1443                         try {
  1297                             wait();
  1444                             lock.wait();
  1298                         } catch (InterruptedException ie) {
  1445                         } catch (InterruptedException ie) {
  1299                             try {
  1446                             if (!(Thread.currentThread() instanceof
  1300                                 Thread.currentThread().interrupt();
  1447                                   ForkJoinWorkerThread))
  1301                             } catch (SecurityException ignore) {
  1448                                 wasInterrupted = true;
  1302                             }
       
  1303                         }
  1449                         }
  1304                     }
  1450                     }
  1305                     else
  1451                     else
  1306                         notifyAll();
  1452                         lock.notifyAll();
  1307                 }
  1453                 }
  1308             }
  1454             }
  1309         }
  1455         }
  1310     }
  1456     }
  1311 
  1457 
  1312     /**
  1458     /**
  1313      * Unlocks and signals any thread waiting for plock. Called only
  1459      * Unlocks and sets runState to newRunState.
  1314      * when CAS of seq value for unlock fails.
  1460      *
  1315      */
  1461      * @param oldRunState a value returned from lockRunState
  1316     private void releasePlock(int ps) {
  1462      * @param newRunState the next value (must have lock bit clear).
  1317         plock = ps;
  1463      */
  1318         synchronized (this) { notifyAll(); }
  1464     private void unlockRunState(int oldRunState, int newRunState) {
  1319     }
  1465         if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
  1320 
  1466             Object lock = stealCounter;
  1321     /**
  1467             runState = newRunState;              // clears RSIGNAL bit
  1322      * Tries to create and start one worker if fewer than target
  1468             if (lock != null)
  1323      * parallelism level exist. Adjusts counts etc on failure.
  1469                 synchronized (lock) { lock.notifyAll(); }
  1324      */
  1470         }
  1325     private void tryAddWorker() {
  1471     }
  1326         long c; int u, e;
  1472 
  1327         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
  1473     // Creating, registering and deregistering workers
  1328                (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
  1474 
  1329             long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
  1475     /**
  1330                               ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
  1476      * Tries to construct and start one worker. Assumes that total
  1331             if (U.compareAndSwapLong(this, CTL, c, nc)) {
  1477      * count has already been incremented as a reservation.  Invokes
  1332                 ForkJoinWorkerThreadFactory fac;
  1478      * deregisterWorker on any failure.
  1333                 Throwable ex = null;
  1479      *
  1334                 ForkJoinWorkerThread wt = null;
  1480      * @return true if successful
  1335                 try {
  1481      */
  1336                     if ((fac = factory) != null &&
  1482     private boolean createWorker() {
  1337                         (wt = fac.newThread(this)) != null) {
  1483         ForkJoinWorkerThreadFactory fac = factory;
  1338                         wt.start();
  1484         Throwable ex = null;
  1339                         break;
  1485         ForkJoinWorkerThread wt = null;
  1340                     }
  1486         try {
  1341                 } catch (Throwable rex) {
  1487             if (fac != null && (wt = fac.newThread(this)) != null) {
  1342                     ex = rex;
  1488                 wt.start();
       
  1489                 return true;
       
  1490             }
       
  1491         } catch (Throwable rex) {
       
  1492             ex = rex;
       
  1493         }
       
  1494         deregisterWorker(wt, ex);
       
  1495         return false;
       
  1496     }
       
  1497 
       
  1498     /**
       
  1499      * Tries to add one worker, incrementing ctl counts before doing
       
  1500      * so, relying on createWorker to back out on failure.
       
  1501      *
       
  1502      * @param c incoming ctl value, with total count negative and no
       
  1503      * idle workers.  On CAS failure, c is refreshed and retried if
       
  1504      * this holds (otherwise, a new worker is not needed).
       
  1505      */
       
  1506     private void tryAddWorker(long c) {
       
  1507         boolean add = false;
       
  1508         do {
       
  1509             long nc = ((AC_MASK & (c + AC_UNIT)) |
       
  1510                        (TC_MASK & (c + TC_UNIT)));
       
  1511             if (ctl == c) {
       
  1512                 int rs, stop;                 // check if terminating
       
  1513                 if ((stop = (rs = lockRunState()) & STOP) == 0)
       
  1514                     add = U.compareAndSwapLong(this, CTL, c, nc);
       
  1515                 unlockRunState(rs, rs & ~RSLOCK);
       
  1516                 if (stop != 0)
       
  1517                     break;
       
  1518                 if (add) {
       
  1519                     createWorker();
       
  1520                     break;
  1343                 }
  1521                 }
  1344                 deregisterWorker(wt, ex);
  1522             }
  1345                 break;
  1523         } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
  1346             }
  1524     }
  1347         }
  1525 
  1348     }
  1526     /**
  1349 
  1527      * Callback from ForkJoinWorkerThread constructor to establish and
  1350     //  Registering and deregistering workers
  1528      * record its WorkQueue.
  1351 
       
  1352     /**
       
  1353      * Callback from ForkJoinWorkerThread to establish and record its
       
  1354      * WorkQueue. To avoid scanning bias due to packing entries in
       
  1355      * front of the workQueues array, we treat the array as a simple
       
  1356      * power-of-two hash table using per-thread seed as hash,
       
  1357      * expanding as needed.
       
  1358      *
  1529      *
  1359      * @param wt the worker thread
  1530      * @param wt the worker thread
  1360      * @return the worker's queue
  1531      * @return the worker's queue
  1361      */
  1532      */
  1362     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  1533     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  1363         UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
  1534         UncaughtExceptionHandler handler;
  1364         wt.setDaemon(true);
  1535         wt.setDaemon(true);                           // configure thread
  1365         if ((handler = ueh) != null)
  1536         if ((handler = ueh) != null)
  1366             wt.setUncaughtExceptionHandler(handler);
  1537             wt.setUncaughtExceptionHandler(handler);
  1367         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
  1538         WorkQueue w = new WorkQueue(this, wt);
  1368                                           s += SEED_INCREMENT) ||
  1539         int i = 0;                                    // assign a pool index
  1369                      s == 0); // skip 0
  1540         int mode = config & MODE_MASK;
  1370         WorkQueue w = new WorkQueue(this, wt, mode, s);
  1541         int rs = lockRunState();
  1371         if (((ps = plock) & PL_LOCK) != 0 ||
       
  1372             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
       
  1373             ps = acquirePlock();
       
  1374         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
       
  1375         try {
  1542         try {
  1376             if ((ws = workQueues) != null) {    // skip if shutting down
  1543             WorkQueue[] ws; int n;                    // skip if no array
  1377                 int n = ws.length, m = n - 1;
  1544             if ((ws = workQueues) != null && (n = ws.length) > 0) {
  1378                 int r = (s << 1) | 1;           // use odd-numbered indices
  1545                 int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
  1379                 if (ws[r &= m] != null) {       // collision
  1546                 int m = n - 1;
  1380                     int probes = 0;             // step by approx half size
  1547                 i = ((s << 1) | 1) & m;               // odd-numbered indices
       
  1548                 if (ws[i] != null) {                  // collision
       
  1549                     int probes = 0;                   // step by approx half n
  1381                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
  1550                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
  1382                     while (ws[r = (r + step) & m] != null) {
  1551                     while (ws[i = (i + step) & m] != null) {
  1383                         if (++probes >= n) {
  1552                         if (++probes >= n) {
  1384                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
  1553                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
  1385                             m = n - 1;
  1554                             m = n - 1;
  1386                             probes = 0;
  1555                             probes = 0;
  1387                         }
  1556                         }
  1388                     }
  1557                     }
  1389                 }
  1558                 }
  1390                 w.poolIndex = (short)r;
  1559                 w.hint = s;                           // use as random seed
  1391                 w.eventCount = r; // volatile write orders
  1560                 w.config = i | mode;
  1392                 ws[r] = w;
  1561                 w.scanState = i;                      // publication fence
       
  1562                 ws[i] = w;
  1393             }
  1563             }
  1394         } finally {
  1564         } finally {
  1395             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
  1565             unlockRunState(rs, rs & ~RSLOCK);
  1396                 releasePlock(nps);
  1566         }
  1397         }
  1567         wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
  1398         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
       
  1399         return w;
  1568         return w;
  1400     }
  1569     }
  1401 
  1570 
  1402     /**
  1571     /**
  1403      * Final callback from terminating worker, as well as upon failure
  1572      * Final callback from terminating worker, as well as upon failure
  1409      * @param ex the exception causing failure, or null if none
  1578      * @param ex the exception causing failure, or null if none
  1410      */
  1579      */
  1411     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  1580     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  1412         WorkQueue w = null;
  1581         WorkQueue w = null;
  1413         if (wt != null && (w = wt.workQueue) != null) {
  1582         if (wt != null && (w = wt.workQueue) != null) {
  1414             int ps;
  1583             WorkQueue[] ws;                           // remove index from array
  1415             w.qlock = -1;                // ensure set
  1584             int idx = w.config & SMASK;
  1416             U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
  1585             int rs = lockRunState();
  1417             if (((ps = plock) & PL_LOCK) != 0 ||
  1586             if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
  1418                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
  1587                 ws[idx] = null;
  1419                 ps = acquirePlock();
  1588             unlockRunState(rs, rs & ~RSLOCK);
  1420             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
  1589         }
  1421             try {
  1590         long c;                                       // decrement counts
  1422                 int idx = w.poolIndex;
       
  1423                 WorkQueue[] ws = workQueues;
       
  1424                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
       
  1425                     ws[idx] = null;
       
  1426             } finally {
       
  1427                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
       
  1428                     releasePlock(nps);
       
  1429             }
       
  1430         }
       
  1431 
       
  1432         long c;                          // adjust ctl counts
       
  1433         do {} while (!U.compareAndSwapLong
  1591         do {} while (!U.compareAndSwapLong
  1434                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
  1592                      (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
  1435                                            ((c - TC_UNIT) & TC_MASK) |
  1593                                            (TC_MASK & (c - TC_UNIT)) |
  1436                                            (c & ~(AC_MASK|TC_MASK)))));
  1594                                            (SP_MASK & c))));
  1437 
  1595         if (w != null) {
  1438         if (!tryTerminate(false, false) && w != null && w.array != null) {
  1596             w.qlock = -1;                             // ensure set
  1439             w.cancelAll();               // cancel remaining tasks
  1597             w.transferStealCount(this);
  1440             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
  1598             w.cancelAll();                            // cancel remaining tasks
  1441             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
  1599         }
  1442                 if (e > 0) {             // activate or create replacement
  1600         for (;;) {                                    // possibly replace
  1443                     if ((ws = workQueues) == null ||
  1601             WorkQueue[] ws; int m, sp;
  1444                         (i = e & SMASK) >= ws.length ||
  1602             if (tryTerminate(false, false) || w == null || w.array == null ||
  1445                         (v = ws[i]) == null)
  1603                 (runState & STOP) != 0 || (ws = workQueues) == null ||
  1446                         break;
  1604                 (m = ws.length - 1) < 0)              // already terminating
  1447                     long nc = (((long)(v.nextWait & E_MASK)) |
  1605                 break;
  1448                                ((long)(u + UAC_UNIT) << 32));
  1606             if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
  1449                     if (v.eventCount != (e | INT_SIGN))
  1607                 if (tryRelease(c, ws[sp & m], AC_UNIT))
  1450                         break;
       
  1451                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
       
  1452                         v.eventCount = (e + E_SEQ) & E_MASK;
       
  1453                         if ((p = v.parker) != null)
       
  1454                             U.unpark(p);
       
  1455                         break;
       
  1456                     }
       
  1457                 }
       
  1458                 else {
       
  1459                     if ((short)u < 0)
       
  1460                         tryAddWorker();
       
  1461                     break;
  1608                     break;
  1462                 }
  1609             }
  1463             }
  1610             else if (ex != null && (c & ADD_WORKER) != 0L) {
  1464         }
  1611                 tryAddWorker(c);                      // create replacement
  1465         if (ex == null)                     // help clean refs on way out
  1612                 break;
       
  1613             }
       
  1614             else                                      // don't need replacement
       
  1615                 break;
       
  1616         }
       
  1617         if (ex == null)                               // help clean on way out
  1466             ForkJoinTask.helpExpungeStaleExceptions();
  1618             ForkJoinTask.helpExpungeStaleExceptions();
  1467         else                                // rethrow
  1619         else                                          // rethrow
  1468             ForkJoinTask.rethrow(ex);
  1620             ForkJoinTask.rethrow(ex);
  1469     }
  1621     }
  1470 
  1622 
  1471     // Submissions
  1623     // Signalling
  1472 
       
  1473     /**
       
  1474      * Unless shutting down, adds the given task to a submission queue
       
  1475      * at submitter's current queue index (modulo submission
       
  1476      * range). Only the most common path is directly handled in this
       
  1477      * method. All others are relayed to fullExternalPush.
       
  1478      *
       
  1479      * @param task the task. Caller must ensure non-null.
       
  1480      */
       
  1481     final void externalPush(ForkJoinTask<?> task) {
       
  1482         WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
       
  1483         int r = ThreadLocalRandom.getProbe();
       
  1484         int ps = plock;
       
  1485         WorkQueue[] ws = workQueues;
       
  1486         if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
       
  1487             (q = ws[m & r & SQMASK]) != null && r != 0 &&
       
  1488             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
       
  1489             if ((a = q.array) != null &&
       
  1490                 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
       
  1491                 int j = ((am & s) << ASHIFT) + ABASE;
       
  1492                 U.putOrderedObject(a, j, task);
       
  1493                 q.top = s + 1;                     // push on to deque
       
  1494                 q.qlock = 0;
       
  1495                 if (n <= 1)
       
  1496                     signalWork(ws, q);
       
  1497                 return;
       
  1498             }
       
  1499             q.qlock = 0;
       
  1500         }
       
  1501         fullExternalPush(task);
       
  1502     }
       
  1503 
       
  1504     /**
       
  1505      * Full version of externalPush. This method is called, among
       
  1506      * other times, upon the first submission of the first task to the
       
  1507      * pool, so must perform secondary initialization.  It also
       
  1508      * detects first submission by an external thread by looking up
       
  1509      * its ThreadLocal, and creates a new shared queue if the one at
       
  1510      * index if empty or contended. The plock lock body must be
       
  1511      * exception-free (so no try/finally) so we optimistically
       
  1512      * allocate new queues outside the lock and throw them away if
       
  1513      * (very rarely) not needed.
       
  1514      *
       
  1515      * Secondary initialization occurs when plock is zero, to create
       
  1516      * workQueue array and set plock to a valid value.  This lock body
       
  1517      * must also be exception-free. Because the plock seq value can
       
  1518      * eventually wrap around zero, this method harmlessly fails to
       
  1519      * reinitialize if workQueues exists, while still advancing plock.
       
  1520      */
       
  1521     private void fullExternalPush(ForkJoinTask<?> task) {
       
  1522         int r;
       
  1523         if ((r = ThreadLocalRandom.getProbe()) == 0) {
       
  1524             ThreadLocalRandom.localInit();
       
  1525             r = ThreadLocalRandom.getProbe();
       
  1526         }
       
  1527         for (;;) {
       
  1528             WorkQueue[] ws; WorkQueue q; int ps, m, k;
       
  1529             boolean move = false;
       
  1530             if ((ps = plock) < 0)
       
  1531                 throw new RejectedExecutionException();
       
  1532             else if (ps == 0 || (ws = workQueues) == null ||
       
  1533                      (m = ws.length - 1) < 0) { // initialize workQueues
       
  1534                 int p = parallelism;            // find power of two table size
       
  1535                 int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
       
  1536                 n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
       
  1537                 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
       
  1538                 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
       
  1539                                    new WorkQueue[n] : null);
       
  1540                 if (((ps = plock) & PL_LOCK) != 0 ||
       
  1541                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
       
  1542                     ps = acquirePlock();
       
  1543                 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
       
  1544                     workQueues = nws;
       
  1545                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
       
  1546                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
       
  1547                     releasePlock(nps);
       
  1548             }
       
  1549             else if ((q = ws[k = r & m & SQMASK]) != null) {
       
  1550                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
       
  1551                     ForkJoinTask<?>[] a = q.array;
       
  1552                     int s = q.top;
       
  1553                     boolean submitted = false;
       
  1554                     try {                      // locked version of push
       
  1555                         if ((a != null && a.length > s + 1 - q.base) ||
       
  1556                             (a = q.growArray()) != null) {   // must presize
       
  1557                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
       
  1558                             U.putOrderedObject(a, j, task);
       
  1559                             q.top = s + 1;
       
  1560                             submitted = true;
       
  1561                         }
       
  1562                     } finally {
       
  1563                         q.qlock = 0;  // unlock
       
  1564                     }
       
  1565                     if (submitted) {
       
  1566                         signalWork(ws, q);
       
  1567                         return;
       
  1568                     }
       
  1569                 }
       
  1570                 move = true; // move on failure
       
  1571             }
       
  1572             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
       
  1573                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
       
  1574                 q.poolIndex = (short)k;
       
  1575                 if (((ps = plock) & PL_LOCK) != 0 ||
       
  1576                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
       
  1577                     ps = acquirePlock();
       
  1578                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
       
  1579                     ws[k] = q;
       
  1580                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
       
  1581                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
       
  1582                     releasePlock(nps);
       
  1583             }
       
  1584             else
       
  1585                 move = true; // move if busy
       
  1586             if (move)
       
  1587                 r = ThreadLocalRandom.advanceProbe(r);
       
  1588         }
       
  1589     }
       
  1590 
       
  1591     // Maintaining ctl counts
       
  1592 
       
  1593     /**
       
  1594      * Increments active count; mainly called upon return from blocking.
       
  1595      */
       
  1596     final void incrementActiveCount() {
       
  1597         long c;
       
  1598         do {} while (!U.compareAndSwapLong
       
  1599                      (this, CTL, c = ctl, ((c & ~AC_MASK) |
       
  1600                                            ((c & AC_MASK) + AC_UNIT))));
       
  1601     }
       
  1602 
  1624 
  1603     /**
  1625     /**
  1604      * Tries to create or activate a worker if too few are active.
  1626      * Tries to create or activate a worker if too few are active.
  1605      *
  1627      *
  1606      * @param ws the worker array to use to find signallees
  1628      * @param ws the worker array to use to find signallees
  1607      * @param q if non-null, the queue holding tasks to be processed
  1629      * @param q a WorkQueue --if non-null, don't retry if now empty
  1608      */
  1630      */
  1609     final void signalWork(WorkQueue[] ws, WorkQueue q) {
  1631     final void signalWork(WorkQueue[] ws, WorkQueue q) {
  1610         for (;;) {
  1632         long c; int sp, i; WorkQueue v; Thread p;
  1611             long c; int e, u, i; WorkQueue w; Thread p;
  1633         while ((c = ctl) < 0L) {                       // too few active
  1612             if ((u = (int)((c = ctl) >>> 32)) >= 0)
  1634             if ((sp = (int)c) == 0) {                  // no idle workers
       
  1635                 if ((c & ADD_WORKER) != 0L)            // too few workers
       
  1636                     tryAddWorker(c);
  1613                 break;
  1637                 break;
  1614             if ((e = (int)c) <= 0) {
  1638             }
  1615                 if ((short)u < 0)
  1639             if (ws == null)                            // unstarted/terminated
  1616                     tryAddWorker();
       
  1617                 break;
  1640                 break;
  1618             }
  1641             if (ws.length <= (i = sp & SMASK))         // terminated
  1619             if (ws == null || ws.length <= (i = e & SMASK) ||
       
  1620                 (w = ws[i]) == null)
       
  1621                 break;
  1642                 break;
  1622             long nc = (((long)(w.nextWait & E_MASK)) |
  1643             if ((v = ws[i]) == null)                   // terminating
  1623                        ((long)(u + UAC_UNIT)) << 32);
  1644                 break;
  1624             int ne = (e + E_SEQ) & E_MASK;
  1645             int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
  1625             if (w.eventCount == (e | INT_SIGN) &&
  1646             int d = sp - v.scanState;                  // screen CAS
  1626                 U.compareAndSwapLong(this, CTL, c, nc)) {
  1647             long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
  1627                 w.eventCount = ne;
  1648             if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
  1628                 if ((p = w.parker) != null)
  1649                 v.scanState = vs;                      // activate v
       
  1650                 if ((p = v.parker) != null)
  1629                     U.unpark(p);
  1651                     U.unpark(p);
  1630                 break;
  1652                 break;
  1631             }
  1653             }
  1632             if (q != null && q.base >= q.top)
  1654             if (q != null && q.base == q.top)          // no more work
  1633                 break;
  1655                 break;
  1634         }
  1656         }
  1635     }
  1657     }
  1636 
  1658 
       
  1659     /**
       
  1660      * Signals and releases worker v if it is top of idle worker
       
  1661      * stack.  This performs a one-shot version of signalWork only if
       
  1662      * there is (apparently) at least one idle worker.
       
  1663      *
       
  1664      * @param c incoming ctl value
       
  1665      * @param v if non-null, a worker
       
  1666      * @param inc the increment to active count (zero when compensating)
       
  1667      * @return true if successful
       
  1668      */
       
  1669     private boolean tryRelease(long c, WorkQueue v, long inc) {
       
  1670         int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
       
  1671         if (v != null && v.scanState == sp) {          // v is at top of stack
       
  1672             long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
       
  1673             if (U.compareAndSwapLong(this, CTL, c, nc)) {
       
  1674                 v.scanState = vs;
       
  1675                 if ((p = v.parker) != null)
       
  1676                     U.unpark(p);
       
  1677                 return true;
       
  1678             }
       
  1679         }
       
  1680         return false;
       
  1681     }
       
  1682 
  1637     // Scanning for tasks
  1683     // Scanning for tasks
  1638 
  1684 
  1639     /**
  1685     /**
  1640      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
  1686      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
  1641      */
  1687      */
  1642     final void runWorker(WorkQueue w) {
  1688     final void runWorker(WorkQueue w) {
  1643         w.growArray(); // allocate queue
  1689         w.growArray();                   // allocate queue
  1644         for (int r = w.hint; scan(w, r) == 0; ) {
  1690         int seed = w.hint;               // initially holds randomization hint
       
  1691         int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
       
  1692         for (ForkJoinTask<?> t;;) {
       
  1693             if ((t = scan(w, r)) != null)
       
  1694                 w.runTask(t);
       
  1695             else if (!awaitWork(w, r))
       
  1696                 break;
  1645             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
  1697             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
  1646         }
  1698         }
  1647     }
  1699     }
  1648 
  1700 
  1649     /**
  1701     /**
  1650      * Scans for and, if found, runs one task, else possibly
  1702      * Scans for and tries to steal a top-level task. Scans start at a
  1651      * inactivates the worker. This method operates on single reads of
  1703      * random location, randomly moving on apparent contention,
  1652      * volatile state and is designed to be re-invoked continuously,
  1704      * otherwise continuing linearly until reaching two consecutive
  1653      * in part because it returns upon detecting inconsistencies,
  1705      * empty passes over all queues with the same checksum (summing
  1654      * contention, or state changes that indicate possible success on
  1706      * each base index of each queue, that moves on each steal), at
  1655      * re-invocation.
  1707      * which point the worker tries to inactivate and then re-scans,
  1656      *
  1708      * attempting to re-activate (itself or some other worker) if
  1657      * The scan searches for tasks across queues starting at a random
  1709      * finding a task; otherwise returning null to await work.  Scans
  1658      * index, checking each at least twice.  The scan terminates upon
  1710      * otherwise touch as little memory as possible, to reduce
  1659      * either finding a non-empty queue, or completing the sweep. If
  1711      * disruption on other scanning threads.
  1660      * the worker is not inactivated, it takes and runs a task from
       
  1661      * this queue. Otherwise, if not activated, it tries to activate
       
  1662      * itself or some other worker by signalling. On failure to find a
       
  1663      * task, returns (for retry) if pool state may have changed during
       
  1664      * an empty scan, or tries to inactivate if active, else possibly
       
  1665      * blocks or terminates via method awaitWork.
       
  1666      *
  1712      *
  1667      * @param w the worker (via its WorkQueue)
  1713      * @param w the worker (via its WorkQueue)
  1668      * @param r a random seed
  1714      * @param r a random seed
  1669      * @return worker qlock status if would have waited, else 0
  1715      * @return a task, or null if none found
  1670      */
  1716      */
  1671     private final int scan(WorkQueue w, int r) {
  1717     private ForkJoinTask<?> scan(WorkQueue w, int r) {
  1672         WorkQueue[] ws; int m;
  1718         WorkQueue[] ws; int m;
  1673         long c = ctl;                            // for consistency check
  1719         if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
  1674         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
  1720             int ss = w.scanState;                     // initially non-negative
  1675             for (int j = m + m + 1, ec = w.eventCount;;) {
  1721             for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
  1676                 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
  1722                 WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
  1677                 if ((q = ws[(r - j) & m]) != null &&
  1723                 int b, n; long c;
  1678                     (b = q.base) - q.top < 0 && (a = q.array) != null) {
  1724                 if ((q = ws[k]) != null) {
  1679                     long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  1725                     if ((n = (b = q.base) - q.top) < 0 &&
  1680                     if ((t = ((ForkJoinTask<?>)
  1726                         (a = q.array) != null) {      // non-empty
  1681                               U.getObjectVolatile(a, i))) != null) {
  1727                         long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  1682                         if (ec < 0)
  1728                         if ((t = ((ForkJoinTask<?>)
  1683                             helpRelease(c, ws, w, q, b);
  1729                                   U.getObjectVolatile(a, i))) != null &&
  1684                         else if (q.base == b &&
  1730                             q.base == b) {
  1685                                  U.compareAndSwapObject(a, i, t, null)) {
  1731                             if (ss >= 0) {
  1686                             U.putOrderedInt(q, QBASE, b + 1);
  1732                                 if (U.compareAndSwapObject(a, i, t, null)) {
  1687                             if ((b + 1) - q.top < 0)
  1733                                     q.base = b + 1;
  1688                                 signalWork(ws, q);
  1734                                     if (n < -1)       // signal others
  1689                             w.runTask(t);
  1735                                         signalWork(ws, q);
       
  1736                                     return t;
       
  1737                                 }
       
  1738                             }
       
  1739                             else if (oldSum == 0 &&   // try to activate
       
  1740                                      w.scanState < 0)
       
  1741                                 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
  1690                         }
  1742                         }
       
  1743                         if (ss < 0)                   // refresh
       
  1744                             ss = w.scanState;
       
  1745                         r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
       
  1746                         origin = k = r & m;           // move and rescan
       
  1747                         oldSum = checkSum = 0;
       
  1748                         continue;
  1691                     }
  1749                     }
  1692                     break;
  1750                     checkSum += b;
  1693                 }
  1751                 }
  1694                 else if (--j < 0) {
  1752                 if ((k = (k + 1) & m) == origin) {    // continue until stable
  1695                     if ((ec | (e = (int)c)) < 0) // inactive or terminating
  1753                     if ((ss >= 0 || (ss == (ss = w.scanState))) &&
  1696                         return awaitWork(w, c, ec);
  1754                         oldSum == (oldSum = checkSum)) {
  1697                     else if (ctl == c) {         // try to inactivate and enqueue
  1755                         if (ss < 0 || w.qlock < 0)    // already inactive
  1698                         long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
  1756                             break;
  1699                         w.nextWait = e;
  1757                         int ns = ss | INACTIVE;       // try to inactivate
  1700                         w.eventCount = ec | INT_SIGN;
  1758                         long nc = ((SP_MASK & ns) |
  1701                         if (!U.compareAndSwapLong(this, CTL, c, nc))
  1759                                    (UC_MASK & ((c = ctl) - AC_UNIT)));
  1702                             w.eventCount = ec;   // back out
  1760                         w.stackPred = (int)c;         // hold prev stack top
       
  1761                         U.putInt(w, QSCANSTATE, ns);
       
  1762                         if (U.compareAndSwapLong(this, CTL, c, nc))
       
  1763                             ss = ns;
       
  1764                         else
       
  1765                             w.scanState = ss;         // back out
  1703                     }
  1766                     }
  1704                     break;
  1767                     checkSum = 0;
  1705                 }
  1768                 }
  1706             }
  1769             }
  1707         }
  1770         }
  1708         return 0;
  1771         return null;
  1709     }
  1772     }
  1710 
  1773 
  1711     /**
  1774     /**
  1712      * A continuation of scan(), possibly blocking or terminating
  1775      * Possibly blocks worker w waiting for a task to steal, or
  1713      * worker w. Returns without blocking if pool state has apparently
  1776      * returns false if the worker should terminate.  If inactivating
  1714      * changed since last invocation.  Also, if inactivating w has
  1777      * w has caused the pool to become quiescent, checks for pool
  1715      * caused the pool to become quiescent, checks for pool
       
  1716      * termination, and, so long as this is not the only worker, waits
  1778      * termination, and, so long as this is not the only worker, waits
  1717      * for event for up to a given duration.  On timeout, if ctl has
  1779      * for up to a given duration.  On timeout, if ctl has not
  1718      * not changed, terminates the worker, which will in turn wake up
  1780      * changed, terminates the worker, which will in turn wake up
  1719      * another worker to possibly repeat this process.
  1781      * another worker to possibly repeat this process.
  1720      *
  1782      *
  1721      * @param w the calling worker
  1783      * @param w the calling worker
  1722      * @param c the ctl value on entry to scan
  1784      * @param r a random seed (for spins)
  1723      * @param ec the worker's eventCount on entry to scan
  1785      * @return false if the worker should terminate
  1724      */
  1786      */
  1725     private final int awaitWork(WorkQueue w, long c, int ec) {
  1787     private boolean awaitWork(WorkQueue w, int r) {
  1726         int stat, ns; long parkTime, deadline;
  1788         if (w == null || w.qlock < 0)                 // w is terminating
  1727         if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
  1789             return false;
  1728             !Thread.interrupted()) {
  1790         for (int pred = w.stackPred, spins = SPINS, ss;;) {
  1729             int e = (int)c;
  1791             if ((ss = w.scanState) >= 0)
  1730             int u = (int)(c >>> 32);
  1792                 break;
  1731             int d = (u >> UAC_SHIFT) + parallelism; // active count
  1793             else if (spins > 0) {
  1732 
  1794                 r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
  1733             if (e < 0 || (d <= 0 && tryTerminate(false, false)))
  1795                 if (r >= 0 && --spins == 0) {         // randomize spins
  1734                 stat = w.qlock = -1;          // pool is terminating
  1796                     WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
  1735             else if ((ns = w.nsteals) != 0) { // collect steals and retry
  1797                     if (pred != 0 && (ws = workQueues) != null &&
  1736                 w.nsteals = 0;
  1798                         (j = pred & SMASK) < ws.length &&
  1737                 U.getAndAddLong(this, STEALCOUNT, (long)ns);
  1799                         (v = ws[j]) != null &&        // see if pred parking
  1738             }
  1800                         (v.parker == null || v.scanState >= 0))
  1739             else {
  1801                         spins = SPINS;                // continue spinning
  1740                 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
  1802                 }
  1741                            ((long)(w.nextWait & E_MASK)) | // ctl to restore
  1803             }
  1742                            ((long)(u + UAC_UNIT)) << 32);
  1804             else if (w.qlock < 0)                     // recheck after spins
  1743                 if (pc != 0L) {               // timed wait if last waiter
  1805                 return false;
  1744                     int dc = -(short)(c >>> TC_SHIFT);
  1806             else if (!Thread.interrupted()) {
  1745                     parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
  1807                 long c, prevctl, parkTime, deadline;
  1746                                 (dc + 1) * IDLE_TIMEOUT);
  1808                 int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
       
  1809                 if ((ac <= 0 && tryTerminate(false, false)) ||
       
  1810                     (runState & STOP) != 0)           // pool terminating
       
  1811                     return false;
       
  1812                 if (ac <= 0 && ss == (int)c) {        // is last waiter
       
  1813                     prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
       
  1814                     int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
       
  1815                     if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
       
  1816                         return false;                 // else use timed wait
       
  1817                     parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
  1747                     deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
  1818                     deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
  1748                 }
  1819                 }
  1749                 else
  1820                 else
  1750                     parkTime = deadline = 0L;
  1821                     prevctl = parkTime = deadline = 0L;
  1751                 if (w.eventCount == ec && ctl == c) {
  1822                 Thread wt = Thread.currentThread();
  1752                     Thread wt = Thread.currentThread();
  1823                 U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
  1753                     U.putObject(wt, PARKBLOCKER, this);
  1824                 w.parker = wt;
  1754                     w.parker = wt;            // emulate LockSupport.park
  1825                 if (w.scanState < 0 && ctl == c)      // recheck before park
  1755                     if (w.eventCount == ec && ctl == c)
  1826                     U.park(false, parkTime);
  1756                         U.park(false, parkTime);  // must recheck before park
  1827                 U.putOrderedObject(w, QPARKER, null);
  1757                     w.parker = null;
  1828                 U.putObject(wt, PARKBLOCKER, null);
  1758                     U.putObject(wt, PARKBLOCKER, null);
  1829                 if (w.scanState >= 0)
  1759                     if (parkTime != 0L && ctl == c &&
  1830                     break;
  1760                         deadline - System.nanoTime() <= 0L &&
  1831                 if (parkTime != 0L && ctl == c &&
  1761                         U.compareAndSwapLong(this, CTL, c, pc))
  1832                     deadline - System.nanoTime() <= 0L &&
  1762                         stat = w.qlock = -1;  // shrink pool
  1833                     U.compareAndSwapLong(this, CTL, c, prevctl))
       
  1834                     return false;                     // shrink pool
       
  1835             }
       
  1836         }
       
  1837         return true;
       
  1838     }
       
  1839 
       
  1840     // Joining tasks
       
  1841 
       
  1842     /**
       
  1843      * Tries to steal and run tasks within the target's computation.
       
  1844      * Uses a variant of the top-level algorithm, restricted to tasks
       
  1845      * with the given task as ancestor: It prefers taking and running
       
  1846      * eligible tasks popped from the worker's own queue (via
       
  1847      * popCC). Otherwise it scans others, randomly moving on
       
  1848      * contention or execution, deciding to give up based on a
       
  1849      * checksum (via return codes frob pollAndExecCC). The maxTasks
       
  1850      * argument supports external usages; internal calls use zero,
       
  1851      * allowing unbounded steps (external calls trap non-positive
       
  1852      * values).
       
  1853      *
       
  1854      * @param w caller
       
  1855      * @param maxTasks if non-zero, the maximum number of other tasks to run
       
  1856      * @return task status on exit
       
  1857      */
       
  1858     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
       
  1859                            int maxTasks) {
       
  1860         WorkQueue[] ws; int s = 0, m;
       
  1861         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
       
  1862             task != null && w != null) {
       
  1863             int mode = w.config;                 // for popCC
       
  1864             int r = w.hint ^ w.top;              // arbitrary seed for origin
       
  1865             int origin = r & m;                  // first queue to scan
       
  1866             int h = 1;                           // 1:ran, >1:contended, <0:hash
       
  1867             for (int k = origin, oldSum = 0, checkSum = 0;;) {
       
  1868                 CountedCompleter<?> p; WorkQueue q;
       
  1869                 if ((s = task.status) < 0)
       
  1870                     break;
       
  1871                 if (h == 1 && (p = w.popCC(task, mode)) != null) {
       
  1872                     p.doExec();                  // run local task
       
  1873                     if (maxTasks != 0 && --maxTasks == 0)
       
  1874                         break;
       
  1875                     origin = k;                  // reset
       
  1876                     oldSum = checkSum = 0;
  1763                 }
  1877                 }
  1764             }
  1878                 else {                           // poll other queues
  1765         }
  1879                     if ((q = ws[k]) == null)
  1766         return stat;
  1880                         h = 0;
  1767     }
  1881                     else if ((h = q.pollAndExecCC(task)) < 0)
  1768 
  1882                         checkSum += h;
  1769     /**
  1883                     if (h > 0) {
  1770      * Possibly releases (signals) a worker. Called only from scan()
  1884                         if (h == 1 && maxTasks != 0 && --maxTasks == 0)
  1771      * when a worker with apparently inactive status finds a non-empty
  1885                             break;
  1772      * queue. This requires revalidating all of the associated state
  1886                         r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
  1773      * from caller.
  1887                         origin = k = r & m;      // move and restart
  1774      */
  1888                         oldSum = checkSum = 0;
  1775     private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
  1889                     }
  1776                                    WorkQueue q, int b) {
  1890                     else if ((k = (k + 1) & m) == origin) {
  1777         WorkQueue v; int e, i; Thread p;
  1891                         if (oldSum == (oldSum = checkSum))
  1778         if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
  1892                             break;
  1779             ws != null && ws.length > (i = e & SMASK) &&
  1893                         checkSum = 0;
  1780             (v = ws[i]) != null && ctl == c) {
  1894                     }
  1781             long nc = (((long)(v.nextWait & E_MASK)) |
  1895                 }
  1782                        ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
  1896             }
  1783             int ne = (e + E_SEQ) & E_MASK;
  1897         }
  1784             if (q != null && q.base == b && w.eventCount < 0 &&
  1898         return s;
  1785                 v.eventCount == (e | INT_SIGN) &&
       
  1786                 U.compareAndSwapLong(this, CTL, c, nc)) {
       
  1787                 v.eventCount = ne;
       
  1788                 if ((p = v.parker) != null)
       
  1789                     U.unpark(p);
       
  1790             }
       
  1791         }
       
  1792     }
  1899     }
  1793 
  1900 
  1794     /**
  1901     /**
  1795      * Tries to locate and execute tasks for a stealer of the given
  1902      * Tries to locate and execute tasks for a stealer of the given
  1796      * task, or in turn one of its stealers, Traces currentSteal ->
  1903      * task, or in turn one of its stealers, Traces currentSteal ->
  1797      * currentJoin links looking for a thread working on a descendant
  1904      * currentJoin links looking for a thread working on a descendant
  1798      * of the given task and with a non-empty queue to steal back and
  1905      * of the given task and with a non-empty queue to steal back and
  1799      * execute tasks from. The first call to this method upon a
  1906      * execute tasks from. The first call to this method upon a
  1800      * waiting join will often entail scanning/search, (which is OK
  1907      * waiting join will often entail scanning/search, (which is OK
  1801      * because the joiner has nothing better to do), but this method
  1908      * because the joiner has nothing better to do), but this method
  1802      * leaves hints in workers to speed up subsequent calls. The
  1909      * leaves hints in workers to speed up subsequent calls.
  1803      * implementation is very branchy to cope with potential
  1910      *
  1804      * inconsistencies or loops encountering chains that are stale,
  1911      * @param w caller
  1805      * unknown, or so long that they are likely cyclic.
       
  1806      *
       
  1807      * @param joiner the joining worker
       
  1808      * @param task the task to join
  1912      * @param task the task to join
  1809      * @return 0 if no progress can be made, negative if task
  1913      */
  1810      * known complete, else positive
  1914     private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
  1811      */
  1915         WorkQueue[] ws = workQueues;
  1812     private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
  1916         int oldSum = 0, checkSum, m;
  1813         int stat = 0, steps = 0;                    // bound to avoid cycles
  1917         if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
  1814         if (task != null && joiner != null &&
  1918             task != null) {
  1815             joiner.base - joiner.top >= 0) {        // hoist checks
  1919             do {                                       // restart point
  1816             restart: for (;;) {
  1920                 checkSum = 0;                          // for stability check
  1817                 ForkJoinTask<?> subtask = task;     // current target
  1921                 ForkJoinTask<?> subtask;
  1818                 for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
  1922                 WorkQueue j = w, v;                    // v is subtask stealer
  1819                     WorkQueue[] ws; int m, s, h;
  1923                 descent: for (subtask = task; subtask.status >= 0; ) {
  1820                     if ((s = task.status) < 0) {
  1924                     for (int h = j.hint | 1, k = 0, i; ; k += 2) {
  1821                         stat = s;
  1925                         if (k > m)                     // can't find stealer
  1822                         break restart;
  1926                             break descent;
  1823                     }
  1927                         if ((v = ws[i = (h + k) & m]) != null) {
  1824                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
  1928                             if (v.currentSteal == subtask) {
  1825                         break restart;              // shutting down
  1929                                 j.hint = i;
  1826                     if ((v = ws[h = (j.hint | 1) & m]) == null ||
       
  1827                         v.currentSteal != subtask) {
       
  1828                         for (int origin = h;;) {    // find stealer
       
  1829                             if (((h = (h + 2) & m) & 15) == 1 &&
       
  1830                                 (subtask.status < 0 || j.currentJoin != subtask))
       
  1831                                 continue restart;   // occasional staleness check
       
  1832                             if ((v = ws[h]) != null &&
       
  1833                                 v.currentSteal == subtask) {
       
  1834                                 j.hint = h;        // save hint
       
  1835                                 break;
  1930                                 break;
  1836                             }
  1931                             }
  1837                             if (h == origin)
  1932                             checkSum += v.base;
  1838                                 break restart;      // cannot find stealer
       
  1839                         }
  1933                         }
  1840                     }
  1934                     }
  1841                     for (;;) { // help stealer or descend to its stealer
  1935                     for (;;) {                         // help v or descend
  1842                         ForkJoinTask<?>[] a; int b;
  1936                         ForkJoinTask<?>[] a; int b;
  1843                         if (subtask.status < 0)     // surround probes with
  1937                         checkSum += (b = v.base);
  1844                             continue restart;       //   consistency checks
  1938                         ForkJoinTask<?> next = v.currentJoin;
  1845                         if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
  1939                         if (subtask.status < 0 || j.currentJoin != subtask ||
  1846                             int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  1940                             v.currentSteal != subtask) // stale
  1847                             ForkJoinTask<?> t =
  1941                             break descent;
  1848                                 (ForkJoinTask<?>)U.getObjectVolatile(a, i);
  1942                         if (b - v.top >= 0 || (a = v.array) == null) {
  1849                             if (subtask.status < 0 || j.currentJoin != subtask ||
  1943                             if ((subtask = next) == null)
  1850                                 v.currentSteal != subtask)
  1944                                 break descent;
  1851                                 continue restart;   // stale
  1945                             j = v;
  1852                             stat = 1;               // apparent progress
  1946                             break;
  1853                             if (v.base == b) {
       
  1854                                 if (t == null)
       
  1855                                     break restart;
       
  1856                                 if (U.compareAndSwapObject(a, i, t, null)) {
       
  1857                                     U.putOrderedInt(v, QBASE, b + 1);
       
  1858                                     ForkJoinTask<?> ps = joiner.currentSteal;
       
  1859                                     int jt = joiner.top;
       
  1860                                     do {
       
  1861                                         joiner.currentSteal = t;
       
  1862                                         t.doExec(); // clear local tasks too
       
  1863                                     } while (task.status >= 0 &&
       
  1864                                              joiner.top != jt &&
       
  1865                                              (t = joiner.pop()) != null);
       
  1866                                     joiner.currentSteal = ps;
       
  1867                                     break restart;
       
  1868                                 }
       
  1869                             }
       
  1870                         }
  1947                         }
  1871                         else {                      // empty -- try to descend
  1948                         int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  1872                             ForkJoinTask<?> next = v.currentJoin;
  1949                         ForkJoinTask<?> t = ((ForkJoinTask<?>)
  1873                             if (subtask.status < 0 || j.currentJoin != subtask ||
  1950                                              U.getObjectVolatile(a, i));
  1874                                 v.currentSteal != subtask)
  1951                         if (v.base == b) {
  1875                                 continue restart;   // stale
  1952                             if (t == null)             // stale
  1876                             else if (next == null || ++steps == MAX_HELP)
  1953                                 break descent;
  1877                                 break restart;      // dead-end or maybe cyclic
  1954                             if (U.compareAndSwapObject(a, i, t, null)) {
  1878                             else {
  1955                                 v.base = b + 1;
  1879                                 subtask = next;
  1956                                 ForkJoinTask<?> ps = w.currentSteal;
  1880                                 j = v;
  1957                                 int top = w.top;
  1881                                 break;
  1958                                 do {
       
  1959                                     U.putOrderedObject(w, QCURRENTSTEAL, t);
       
  1960                                     t.doExec();        // clear local tasks too
       
  1961                                 } while (task.status >= 0 &&
       
  1962                                          w.top != top &&
       
  1963                                          (t = w.pop()) != null);
       
  1964                                 U.putOrderedObject(w, QCURRENTSTEAL, ps);
       
  1965                                 if (w.base != w.top)
       
  1966                                     return;            // can't further help
  1882                             }
  1967                             }
  1883                         }
  1968                         }
  1884                     }
  1969                     }
  1885                 }
  1970                 }
  1886             }
  1971             } while (task.status >= 0 && oldSum != (oldSum = checkSum));
  1887         }
  1972         }
  1888         return stat;
  1973     }
  1889     }
  1974 
  1890 
  1975     /**
  1891     /**
  1976      * Tries to decrement active count (sometimes implicitly) and
  1892      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
  1977      * possibly release or create a compensating worker in preparation
  1893      * and run tasks within the target's computation.
  1978      * for blocking. Returns false (retryable by caller), on
  1894      *
  1979      * contention, detected staleness, instability, or termination.
  1895      * @param task the task to join
  1980      *
  1896      * @param maxTasks the maximum number of other tasks to run
  1981      * @param w caller
  1897      */
  1982      */
  1898     final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
  1983     private boolean tryCompensate(WorkQueue w) {
  1899                            int maxTasks) {
  1984         boolean canBlock;
  1900         WorkQueue[] ws; int m;
  1985         WorkQueue[] ws; long c; int m, pc, sp;
       
  1986         if (w == null || w.qlock < 0 ||           // caller terminating
       
  1987             (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
       
  1988             (pc = config & SMASK) == 0)           // parallelism disabled
       
  1989             canBlock = false;
       
  1990         else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
       
  1991             canBlock = tryRelease(c, ws[sp & m], 0L);
       
  1992         else {
       
  1993             int ac = (int)(c >> AC_SHIFT) + pc;
       
  1994             int tc = (short)(c >> TC_SHIFT) + pc;
       
  1995             int nbusy = 0;                        // validate saturation
       
  1996             for (int i = 0; i <= m; ++i) {        // two passes of odd indices
       
  1997                 WorkQueue v;
       
  1998                 if ((v = ws[((i << 1) | 1) & m]) != null) {
       
  1999                     if ((v.scanState & SCANNING) != 0)
       
  2000                         break;
       
  2001                     ++nbusy;
       
  2002                 }
       
  2003             }
       
  2004             if (nbusy != (tc << 1) || ctl != c)
       
  2005                 canBlock = false;                 // unstable or stale
       
  2006             else if (tc >= pc && ac > 1 && w.isEmpty()) {
       
  2007                 long nc = ((AC_MASK & (c - AC_UNIT)) |
       
  2008                            (~AC_MASK & c));       // uncompensated
       
  2009                 canBlock = U.compareAndSwapLong(this, CTL, c, nc);
       
  2010             }
       
  2011             else if (tc >= MAX_CAP ||
       
  2012                      (this == common && tc >= pc + commonMaxSpares))
       
  2013                 throw new RejectedExecutionException(
       
  2014                     "Thread limit exceeded replacing blocked worker");
       
  2015             else {                                // similar to tryAddWorker
       
  2016                 boolean add = false; int rs;      // CAS within lock
       
  2017                 long nc = ((AC_MASK & c) |
       
  2018                            (TC_MASK & (c + TC_UNIT)));
       
  2019                 if (((rs = lockRunState()) & STOP) == 0)
       
  2020                     add = U.compareAndSwapLong(this, CTL, c, nc);
       
  2021                 unlockRunState(rs, rs & ~RSLOCK);
       
  2022                 canBlock = add && createWorker(); // throws on exception
       
  2023             }
       
  2024         }
       
  2025         return canBlock;
       
  2026     }
       
  2027 
       
  2028     /**
       
  2029      * Helps and/or blocks until the given task is done or timeout.
       
  2030      *
       
  2031      * @param w caller
       
  2032      * @param task the task
       
  2033      * @param deadline for timed waits, if nonzero
       
  2034      * @return task status on exit
       
  2035      */
       
  2036     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
  1901         int s = 0;
  2037         int s = 0;
  1902         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
  2038         if (task != null && w != null) {
  1903             joiner != null && task != null) {
  2039             ForkJoinTask<?> prevJoin = w.currentJoin;
  1904             int j = joiner.poolIndex;
  2040             U.putOrderedObject(w, QCURRENTJOIN, task);
  1905             int scans = m + m + 1;
  2041             CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
  1906             long c = 0L;              // for stability check
  2042                 (CountedCompleter<?>)task : null;
  1907             for (int k = scans; ; j += 2) {
  2043             for (;;) {
  1908                 WorkQueue q;
       
  1909                 if ((s = task.status) < 0)
  2044                 if ((s = task.status) < 0)
  1910                     break;
  2045                     break;
  1911                 else if (joiner.internalPopAndExecCC(task)) {
  2046                 if (cc != null)
  1912                     if (--maxTasks <= 0) {
  2047                     helpComplete(w, cc, 0);
  1913                         s = task.status;
  2048                 else if (w.base == w.top || w.tryRemoveAndExec(task))
  1914                         break;
  2049                     helpStealer(w, task);
  1915                     }
  2050                 if ((s = task.status) < 0)
  1916                     k = scans;
  2051                     break;
       
  2052                 long ms, ns;
       
  2053                 if (deadline == 0L)
       
  2054                     ms = 0L;
       
  2055                 else if ((ns = deadline - System.nanoTime()) <= 0L)
       
  2056                     break;
       
  2057                 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
       
  2058                     ms = 1L;
       
  2059                 if (tryCompensate(w)) {
       
  2060                     task.internalWait(ms);
       
  2061                     U.getAndAddLong(this, CTL, AC_UNIT);
  1917                 }
  2062                 }
  1918                 else if ((s = task.status) < 0)
  2063             }
  1919                     break;
  2064             U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
  1920                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
       
  1921                     if (--maxTasks <= 0) {
       
  1922                         s = task.status;
       
  1923                         break;
       
  1924                     }
       
  1925                     k = scans;
       
  1926                 }
       
  1927                 else if (--k < 0) {
       
  1928                     if (c == (c = ctl))
       
  1929                         break;
       
  1930                     k = scans;
       
  1931                 }
       
  1932             }
       
  1933         }
  2065         }
  1934         return s;
  2066         return s;
  1935     }
  2067     }
  1936 
  2068 
  1937     /**
  2069     // Specialized scanning
  1938      * Tries to decrement active count (sometimes implicitly) and
       
  1939      * possibly release or create a compensating worker in preparation
       
  1940      * for blocking. Fails on contention or termination. Otherwise,
       
  1941      * adds a new thread if no idle workers are available and pool
       
  1942      * may become starved.
       
  1943      *
       
  1944      * @param c the assumed ctl value
       
  1945      */
       
  1946     final boolean tryCompensate(long c) {
       
  1947         WorkQueue[] ws = workQueues;
       
  1948         int pc = parallelism, e = (int)c, m, tc;
       
  1949         if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
       
  1950             WorkQueue w = ws[e & m];
       
  1951             if (e != 0 && w != null) {
       
  1952                 Thread p;
       
  1953                 long nc = ((long)(w.nextWait & E_MASK) |
       
  1954                            (c & (AC_MASK|TC_MASK)));
       
  1955                 int ne = (e + E_SEQ) & E_MASK;
       
  1956                 if (w.eventCount == (e | INT_SIGN) &&
       
  1957                     U.compareAndSwapLong(this, CTL, c, nc)) {
       
  1958                     w.eventCount = ne;
       
  1959                     if ((p = w.parker) != null)
       
  1960                         U.unpark(p);
       
  1961                     return true;   // replace with idle worker
       
  1962                 }
       
  1963             }
       
  1964             else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
       
  1965                      (int)(c >> AC_SHIFT) + pc > 1) {
       
  1966                 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
       
  1967                 if (U.compareAndSwapLong(this, CTL, c, nc))
       
  1968                     return true;   // no compensation
       
  1969             }
       
  1970             else if (tc + pc < MAX_CAP) {
       
  1971                 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
       
  1972                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
       
  1973                     ForkJoinWorkerThreadFactory fac;
       
  1974                     Throwable ex = null;
       
  1975                     ForkJoinWorkerThread wt = null;
       
  1976                     try {
       
  1977                         if ((fac = factory) != null &&
       
  1978                             (wt = fac.newThread(this)) != null) {
       
  1979                             wt.start();
       
  1980                             return true;
       
  1981                         }
       
  1982                     } catch (Throwable rex) {
       
  1983                         ex = rex;
       
  1984                     }
       
  1985                     deregisterWorker(wt, ex); // clean up and return false
       
  1986                 }
       
  1987             }
       
  1988         }
       
  1989         return false;
       
  1990     }
       
  1991 
       
  1992     /**
       
  1993      * Helps and/or blocks until the given task is done.
       
  1994      *
       
  1995      * @param joiner the joining worker
       
  1996      * @param task the task
       
  1997      * @return task status on exit
       
  1998      */
       
  1999     final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
       
  2000         int s = 0;
       
  2001         if (task != null && (s = task.status) >= 0 && joiner != null) {
       
  2002             ForkJoinTask<?> prevJoin = joiner.currentJoin;
       
  2003             joiner.currentJoin = task;
       
  2004             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
       
  2005                          (s = task.status) >= 0);
       
  2006             if (s >= 0 && (task instanceof CountedCompleter))
       
  2007                 s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
       
  2008             long cc = 0;        // for stability checks
       
  2009             while (s >= 0 && (s = task.status) >= 0) {
       
  2010                 if ((s = tryHelpStealer(joiner, task)) == 0 &&
       
  2011                     (s = task.status) >= 0) {
       
  2012                     if (!tryCompensate(cc))
       
  2013                         cc = ctl;
       
  2014                     else {
       
  2015                         if (task.trySetSignal() && (s = task.status) >= 0) {
       
  2016                             synchronized (task) {
       
  2017                                 if (task.status >= 0) {
       
  2018                                     try {                // see ForkJoinTask
       
  2019                                         task.wait();     //  for explanation
       
  2020                                     } catch (InterruptedException ie) {
       
  2021                                     }
       
  2022                                 }
       
  2023                                 else
       
  2024                                     task.notifyAll();
       
  2025                             }
       
  2026                         }
       
  2027                         long c; // reactivate
       
  2028                         do {} while (!U.compareAndSwapLong
       
  2029                                      (this, CTL, c = ctl,
       
  2030                                       ((c & ~AC_MASK) |
       
  2031                                        ((c & AC_MASK) + AC_UNIT))));
       
  2032                     }
       
  2033                 }
       
  2034             }
       
  2035             joiner.currentJoin = prevJoin;
       
  2036         }
       
  2037         return s;
       
  2038     }
       
  2039 
       
  2040     /**
       
  2041      * Stripped-down variant of awaitJoin used by timed joins. Tries
       
  2042      * to help join only while there is continuous progress. (Caller
       
  2043      * will then enter a timed wait.)
       
  2044      *
       
  2045      * @param joiner the joining worker
       
  2046      * @param task the task
       
  2047      */
       
  2048     final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
       
  2049         int s;
       
  2050         if (joiner != null && task != null && (s = task.status) >= 0) {
       
  2051             ForkJoinTask<?> prevJoin = joiner.currentJoin;
       
  2052             joiner.currentJoin = task;
       
  2053             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
       
  2054                          (s = task.status) >= 0);
       
  2055             if (s >= 0) {
       
  2056                 if (task instanceof CountedCompleter)
       
  2057                     helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
       
  2058                 do {} while (task.status >= 0 &&
       
  2059                              tryHelpStealer(joiner, task) > 0);
       
  2060             }
       
  2061             joiner.currentJoin = prevJoin;
       
  2062         }
       
  2063     }
       
  2064 
  2070 
  2065     /**
  2071     /**
  2066      * Returns a (probably) non-empty steal queue, if one is found
  2072      * Returns a (probably) non-empty steal queue, if one is found
  2067      * during a scan, else null.  This method must be retried by
  2073      * during a scan, else null.  This method must be retried by
  2068      * caller if, by the time it tries to use the queue, it is empty.
  2074      * caller if, by the time it tries to use the queue, it is empty.
  2069      */
  2075      */
  2070     private WorkQueue findNonEmptyStealQueue() {
  2076     private WorkQueue findNonEmptyStealQueue() {
       
  2077         WorkQueue[] ws; int m;  // one-shot version of scan loop
  2071         int r = ThreadLocalRandom.nextSecondarySeed();
  2078         int r = ThreadLocalRandom.nextSecondarySeed();
  2072         for (;;) {
  2079         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
  2073             int ps = plock, m; WorkQueue[] ws; WorkQueue q;
  2080             for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
  2074             if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
  2081                 WorkQueue q; int b;
  2075                 for (int j = (m + 1) << 2; j >= 0; --j) {
  2082                 if ((q = ws[k]) != null) {
  2076                     if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
  2083                     if ((b = q.base) - q.top < 0)
  2077                         q.base - q.top < 0)
       
  2078                         return q;
  2084                         return q;
       
  2085                     checkSum += b;
  2079                 }
  2086                 }
  2080             }
  2087                 if ((k = (k + 1) & m) == origin) {
  2081             if (plock == ps)
  2088                     if (oldSum == (oldSum = checkSum))
  2082                 return null;
  2089                         break;
  2083         }
  2090                     checkSum = 0;
       
  2091                 }
       
  2092             }
       
  2093         }
       
  2094         return null;
  2084     }
  2095     }
  2085 
  2096 
  2086     /**
  2097     /**
  2087      * Runs tasks until {@code isQuiescent()}. We piggyback on
  2098      * Runs tasks until {@code isQuiescent()}. We piggyback on
  2088      * active count ctl maintenance, but rather than blocking
  2099      * active count ctl maintenance, but rather than blocking
  2089      * when tasks cannot be found, we rescan until all others cannot
  2100      * when tasks cannot be found, we rescan until all others cannot
  2090      * find tasks either.
  2101      * find tasks either.
  2091      */
  2102      */
  2092     final void helpQuiescePool(WorkQueue w) {
  2103     final void helpQuiescePool(WorkQueue w) {
  2093         ForkJoinTask<?> ps = w.currentSteal;
  2104         ForkJoinTask<?> ps = w.currentSteal; // save context
  2094         for (boolean active = true;;) {
  2105         for (boolean active = true;;) {
  2095             long c; WorkQueue q; ForkJoinTask<?> t; int b;
  2106             long c; WorkQueue q; ForkJoinTask<?> t; int b;
  2096             while ((t = w.nextLocalTask()) != null)
  2107             w.execLocalTasks();     // run locals before each scan
  2097                 t.doExec();
       
  2098             if ((q = findNonEmptyStealQueue()) != null) {
  2108             if ((q = findNonEmptyStealQueue()) != null) {
  2099                 if (!active) {      // re-establish active count
  2109                 if (!active) {      // re-establish active count
  2100                     active = true;
  2110                     active = true;
  2101                     do {} while (!U.compareAndSwapLong
  2111                     U.getAndAddLong(this, CTL, AC_UNIT);
  2102                                  (this, CTL, c = ctl,
       
  2103                                   ((c & ~AC_MASK) |
       
  2104                                    ((c & AC_MASK) + AC_UNIT))));
       
  2105                 }
  2112                 }
  2106                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
  2113                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
  2107                     w.runTask(t);
  2114                     U.putOrderedObject(w, QCURRENTSTEAL, t);
       
  2115                     t.doExec();
       
  2116                     if (++w.nsteals < 0)
       
  2117                         w.transferStealCount(this);
       
  2118                 }
  2108             }
  2119             }
  2109             else if (active) {      // decrement active count without queuing
  2120             else if (active) {      // decrement active count without queuing
  2110                 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
  2121                 long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
  2111                 if ((int)(nc >> AC_SHIFT) + parallelism == 0)
  2122                 if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
  2112                     break;          // bypass decrement-then-increment
  2123                     break;          // bypass decrement-then-increment
  2113                 if (U.compareAndSwapLong(this, CTL, c, nc))
  2124                 if (U.compareAndSwapLong(this, CTL, c, nc))
  2114                     active = false;
  2125                     active = false;
  2115             }
  2126             }
  2116             else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
  2127             else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
  2117                      U.compareAndSwapLong
  2128                      U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
  2118                      (this, CTL, c, ((c & ~AC_MASK) |
       
  2119                                      ((c & AC_MASK) + AC_UNIT))))
       
  2120                 break;
  2129                 break;
  2121         }
  2130         }
       
  2131         U.putOrderedObject(w, QCURRENTSTEAL, ps);
  2122     }
  2132     }
  2123 
  2133 
  2124     /**
  2134     /**
  2125      * Gets and removes a local or stolen task for the given worker.
  2135      * Gets and removes a local or stolen task for the given worker.
  2126      *
  2136      *
  2139     }
  2149     }
  2140 
  2150 
  2141     /**
  2151     /**
  2142      * Returns a cheap heuristic guide for task partitioning when
  2152      * Returns a cheap heuristic guide for task partitioning when
  2143      * programmers, frameworks, tools, or languages have little or no
  2153      * programmers, frameworks, tools, or languages have little or no
  2144      * idea about task granularity.  In essence by offering this
  2154      * idea about task granularity.  In essence, by offering this
  2145      * method, we ask users only about tradeoffs in overhead vs
  2155      * method, we ask users only about tradeoffs in overhead vs
  2146      * expected throughput and its variance, rather than how finely to
  2156      * expected throughput and its variance, rather than how finely to
  2147      * partition tasks.
  2157      * partition tasks.
  2148      *
  2158      *
  2149      * In a steady state strict (tree-structured) computation, each
  2159      * In a steady state strict (tree-structured) computation, each
  2177      * leads to serious mis-estimates in some non-steady-state
  2187      * leads to serious mis-estimates in some non-steady-state
  2178      * conditions (ramp-up, ramp-down, other stalls). We can detect
  2188      * conditions (ramp-up, ramp-down, other stalls). We can detect
  2179      * many of these by further considering the number of "idle"
  2189      * many of these by further considering the number of "idle"
  2180      * threads, that are known to have zero queued tasks, so
  2190      * threads, that are known to have zero queued tasks, so
  2181      * compensate by a factor of (#idle/#active) threads.
  2191      * compensate by a factor of (#idle/#active) threads.
  2182      *
       
  2183      * Note: The approximation of #busy workers as #active workers is
       
  2184      * not very good under current signalling scheme, and should be
       
  2185      * improved.
       
  2186      */
  2192      */
  2187     static int getSurplusQueuedTaskCount() {
  2193     static int getSurplusQueuedTaskCount() {
  2188         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
  2194         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
  2189         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
  2195         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
  2190             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
  2196             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).
       
  2197                 config & SMASK;
  2191             int n = (q = wt.workQueue).top - q.base;
  2198             int n = (q = wt.workQueue).top - q.base;
  2192             int a = (int)(pool.ctl >> AC_SHIFT) + p;
  2199             int a = (int)(pool.ctl >> AC_SHIFT) + p;
  2193             return n - (a > (p >>>= 1) ? 0 :
  2200             return n - (a > (p >>>= 1) ? 0 :
  2194                         a > (p >>>= 1) ? 1 :
  2201                         a > (p >>>= 1) ? 1 :
  2195                         a > (p >>>= 1) ? 2 :
  2202                         a > (p >>>= 1) ? 2 :
  2200     }
  2207     }
  2201 
  2208 
  2202     //  Termination
  2209     //  Termination
  2203 
  2210 
  2204     /**
  2211     /**
  2205      * Possibly initiates and/or completes termination.  The caller
  2212      * Possibly initiates and/or completes termination.
  2206      * triggering termination runs three passes through workQueues:
       
  2207      * (0) Setting termination status, followed by wakeups of queued
       
  2208      * workers; (1) cancelling all tasks; (2) interrupting lagging
       
  2209      * threads (likely in external tasks, but possibly also blocked in
       
  2210      * joins).  Each pass repeats previous steps because of potential
       
  2211      * lagging thread creation.
       
  2212      *
  2213      *
  2213      * @param now if true, unconditionally terminate, else only
  2214      * @param now if true, unconditionally terminate, else only
  2214      * if no work and no active workers
  2215      * if no work and no active workers
  2215      * @param enable if true, enable shutdown when next possible
  2216      * @param enable if true, enable shutdown when next possible
  2216      * @return true if now terminating or terminated
  2217      * @return true if now terminating or terminated
  2217      */
  2218      */
  2218     private boolean tryTerminate(boolean now, boolean enable) {
  2219     private boolean tryTerminate(boolean now, boolean enable) {
  2219         int ps;
  2220         int rs;
  2220         if (this == common)                        // cannot shut down
  2221         if (this == common)                       // cannot shut down
  2221             return false;
  2222             return false;
  2222         if ((ps = plock) >= 0) {                   // enable by setting plock
  2223         if ((rs = runState) >= 0) {
  2223             if (!enable)
  2224             if (!enable)
  2224                 return false;
  2225                 return false;
  2225             if ((ps & PL_LOCK) != 0 ||
  2226             rs = lockRunState();                  // enter SHUTDOWN phase
  2226                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
  2227             unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
  2227                 ps = acquirePlock();
  2228         }
  2228             int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
  2229 
  2229             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
  2230         if ((rs & STOP) == 0) {
  2230                 releasePlock(nps);
  2231             if (!now) {                           // check quiescence
  2231         }
  2232                 for (long oldSum = 0L;;) {        // repeat until stable
  2232         for (long c;;) {
  2233                     WorkQueue[] ws; WorkQueue w; int m, b; long c;
  2233             if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
  2234                     long checkSum = ctl;
  2234                 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
  2235                     if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
  2235                     synchronized (this) {
  2236                         return false;             // still active workers
  2236                         notifyAll();               // signal when 0 workers
  2237                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
       
  2238                         break;                    // check queues
       
  2239                     for (int i = 0; i <= m; ++i) {
       
  2240                         if ((w = ws[i]) != null) {
       
  2241                             if ((b = w.base) != w.top || w.scanState >= 0 ||
       
  2242                                 w.currentSteal != null) {
       
  2243                                 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
       
  2244                                 return false;     // arrange for recheck
       
  2245                             }
       
  2246                             checkSum += b;
       
  2247                             if ((i & 1) == 0)
       
  2248                                 w.qlock = -1;     // try to disable external
       
  2249                         }
  2237                     }
  2250                     }
       
  2251                     if (oldSum == (oldSum = checkSum))
       
  2252                         break;
  2238                 }
  2253                 }
  2239                 return true;
  2254             }
  2240             }
  2255             if ((runState & STOP) == 0) {
  2241             if (!now) {                            // check if idle & no tasks
  2256                 rs = lockRunState();              // enter STOP phase
  2242                 WorkQueue[] ws; WorkQueue w;
  2257                 unlockRunState(rs, (rs & ~RSLOCK) | STOP);
  2243                 if ((int)(c >> AC_SHIFT) + parallelism > 0)
  2258             }
  2244                     return false;
  2259         }
  2245                 if ((ws = workQueues) != null) {
  2260 
  2246                     for (int i = 0; i < ws.length; ++i) {
  2261         int pass = 0;                             // 3 passes to help terminate
  2247                         if ((w = ws[i]) != null &&
  2262         for (long oldSum = 0L;;) {                // or until done or stable
  2248                             (!w.isEmpty() ||
  2263             WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
  2249                              ((i & 1) != 0 && w.eventCount >= 0))) {
  2264             long checkSum = ctl;
  2250                             signalWork(ws, w);
  2265             if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
  2251                             return false;
  2266                 (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
       
  2267                 if ((runState & TERMINATED) == 0) {
       
  2268                     rs = lockRunState();          // done
       
  2269                     unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
       
  2270                     synchronized (this) { notifyAll(); } // for awaitTermination
       
  2271                 }
       
  2272                 break;
       
  2273             }
       
  2274             for (int i = 0; i <= m; ++i) {
       
  2275                 if ((w = ws[i]) != null) {
       
  2276                     checkSum += w.base;
       
  2277                     w.qlock = -1;                 // try to disable
       
  2278                     if (pass > 0) {
       
  2279                         w.cancelAll();            // clear queue
       
  2280                         if (pass > 1 && (wt = w.owner) != null) {
       
  2281                             if (!wt.isInterrupted()) {
       
  2282                                 try {             // unblock join
       
  2283                                     wt.interrupt();
       
  2284                                 } catch (Throwable ignore) {
       
  2285                                 }
       
  2286                             }
       
  2287                             if (w.scanState < 0)
       
  2288                                 U.unpark(wt);     // wake up
  2252                         }
  2289                         }
  2253                     }
  2290                     }
  2254                 }
  2291                 }
  2255             }
  2292             }
  2256             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
  2293             if (checkSum != oldSum) {             // unstable
  2257                 for (int pass = 0; pass < 3; ++pass) {
  2294                 oldSum = checkSum;
  2258                     WorkQueue[] ws; WorkQueue w; Thread wt;
  2295                 pass = 0;
  2259                     if ((ws = workQueues) != null) {
  2296             }
  2260                         int n = ws.length;
  2297             else if (pass > 3 && pass > m)        // can't further help
  2261                         for (int i = 0; i < n; ++i) {
  2298                 break;
  2262                             if ((w = ws[i]) != null) {
  2299             else if (++pass > 1) {                // try to dequeue
  2263                                 w.qlock = -1;
  2300                 long c; int j = 0, sp;            // bound attempts
  2264                                 if (pass > 0) {
  2301                 while (j++ <= m && (sp = (int)(c = ctl)) != 0)
  2265                                     w.cancelAll();
  2302                     tryRelease(c, ws[sp & m], AC_UNIT);
  2266                                     if (pass > 1 && (wt = w.owner) != null) {
  2303             }
  2267                                         if (!wt.isInterrupted()) {
  2304         }
  2268                                             try {
  2305         return true;
  2269                                                 wt.interrupt();
  2306     }
  2270                                             } catch (Throwable ignore) {
  2307 
  2271                                             }
  2308     // External operations
  2272                                         }
  2309 
  2273                                         U.unpark(wt);
  2310     /**
  2274                                     }
  2311      * Full version of externalPush, handling uncommon cases, as well
  2275                                 }
  2312      * as performing secondary initialization upon the first
  2276                             }
  2313      * submission of the first task to the pool.  It also detects
       
  2314      * first submission by an external thread and creates a new shared
       
  2315      * queue if the one at index if empty or contended.
       
  2316      *
       
  2317      * @param task the task. Caller must ensure non-null.
       
  2318      */
       
  2319     private void externalSubmit(ForkJoinTask<?> task) {
       
  2320         int r;                                    // initialize caller's probe
       
  2321         if ((r = ThreadLocalRandom.getProbe()) == 0) {
       
  2322             ThreadLocalRandom.localInit();
       
  2323             r = ThreadLocalRandom.getProbe();
       
  2324         }
       
  2325         for (;;) {
       
  2326             WorkQueue[] ws; WorkQueue q; int rs, m, k;
       
  2327             boolean move = false;
       
  2328             if ((rs = runState) < 0) {
       
  2329                 tryTerminate(false, false);     // help terminate
       
  2330                 throw new RejectedExecutionException();
       
  2331             }
       
  2332             else if ((rs & STARTED) == 0 ||     // initialize
       
  2333                      ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
       
  2334                 int ns = 0;
       
  2335                 rs = lockRunState();
       
  2336                 try {
       
  2337                     if ((rs & STARTED) == 0) {
       
  2338                         U.compareAndSwapObject(this, STEALCOUNTER, null,
       
  2339                                                new AtomicLong());
       
  2340                         // create workQueues array with size a power of two
       
  2341                         int p = config & SMASK; // ensure at least 2 slots
       
  2342                         int n = (p > 1) ? p - 1 : 1;
       
  2343                         n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
       
  2344                         n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
       
  2345                         workQueues = new WorkQueue[n];
       
  2346                         ns = STARTED;
       
  2347                     }
       
  2348                 } finally {
       
  2349                     unlockRunState(rs, (rs & ~RSLOCK) | ns);
       
  2350                 }
       
  2351             }
       
  2352             else if ((q = ws[k = r & m & SQMASK]) != null) {
       
  2353                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
       
  2354                     ForkJoinTask<?>[] a = q.array;
       
  2355                     int s = q.top;
       
  2356                     boolean submitted = false; // initial submission or resizing
       
  2357                     try {                      // locked version of push
       
  2358                         if ((a != null && a.length > s + 1 - q.base) ||
       
  2359                             (a = q.growArray()) != null) {
       
  2360                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
       
  2361                             U.putOrderedObject(a, j, task);
       
  2362                             U.putOrderedInt(q, QTOP, s + 1);
       
  2363                             submitted = true;
  2277                         }
  2364                         }
  2278                         // Wake up workers parked on event queue
  2365                     } finally {
  2279                         int i, e; long cc; Thread p;
  2366                         U.compareAndSwapInt(q, QLOCK, 1, 0);
  2280                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
  2367                     }
  2281                                (i = e & SMASK) < n && i >= 0 &&
  2368                     if (submitted) {
  2282                                (w = ws[i]) != null) {
  2369                         signalWork(ws, q);
  2283                             long nc = ((long)(w.nextWait & E_MASK) |
  2370                         return;
  2284                                        ((cc + AC_UNIT) & AC_MASK) |
       
  2285                                        (cc & (TC_MASK|STOP_BIT)));
       
  2286                             if (w.eventCount == (e | INT_SIGN) &&
       
  2287                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
       
  2288                                 w.eventCount = (e + E_SEQ) & E_MASK;
       
  2289                                 w.qlock = -1;
       
  2290                                 if ((p = w.parker) != null)
       
  2291                                     U.unpark(p);
       
  2292                             }
       
  2293                         }
       
  2294                     }
  2371                     }
  2295                 }
  2372                 }
  2296             }
  2373                 move = true;                   // move on failure
  2297         }
  2374             }
  2298     }
  2375             else if (((rs = runState) & RSLOCK) == 0) { // create new queue
  2299 
  2376                 q = new WorkQueue(this, null);
  2300     // external operations on common pool
  2377                 q.hint = r;
  2301 
  2378                 q.config = k | SHARED_QUEUE;
  2302     /**
  2379                 q.scanState = INACTIVE;
  2303      * Returns common pool queue for a thread that has submitted at
  2380                 rs = lockRunState();           // publish index
  2304      * least one task.
  2381                 if (rs > 0 &&  (ws = workQueues) != null &&
       
  2382                     k < ws.length && ws[k] == null)
       
  2383                     ws[k] = q;                 // else terminated
       
  2384                 unlockRunState(rs, rs & ~RSLOCK);
       
  2385             }
       
  2386             else
       
  2387                 move = true;                   // move if busy
       
  2388             if (move)
       
  2389                 r = ThreadLocalRandom.advanceProbe(r);
       
  2390         }
       
  2391     }
       
  2392 
       
  2393     /**
       
  2394      * Tries to add the given task to a submission queue at
       
  2395      * submitter's current queue. Only the (vastly) most common path
       
  2396      * is directly handled in this method, while screening for need
       
  2397      * for externalSubmit.
       
  2398      *
       
  2399      * @param task the task. Caller must ensure non-null.
       
  2400      */
       
  2401     final void externalPush(ForkJoinTask<?> task) {
       
  2402         WorkQueue[] ws; WorkQueue q; int m;
       
  2403         int r = ThreadLocalRandom.getProbe();
       
  2404         int rs = runState;
       
  2405         if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
       
  2406             (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
       
  2407             U.compareAndSwapInt(q, QLOCK, 0, 1)) {
       
  2408             ForkJoinTask<?>[] a; int am, n, s;
       
  2409             if ((a = q.array) != null &&
       
  2410                 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
       
  2411                 int j = ((am & s) << ASHIFT) + ABASE;
       
  2412                 U.putOrderedObject(a, j, task);
       
  2413                 U.putOrderedInt(q, QTOP, s + 1);
       
  2414                 U.putOrderedInt(q, QLOCK, 0);
       
  2415                 if (n <= 1)
       
  2416                     signalWork(ws, q);
       
  2417                 return;
       
  2418             }
       
  2419             U.compareAndSwapInt(q, QLOCK, 1, 0);
       
  2420         }
       
  2421         externalSubmit(task);
       
  2422     }
       
  2423 
       
  2424     /**
       
  2425      * Returns common pool queue for an external thread.
  2305      */
  2426      */
  2306     static WorkQueue commonSubmitterQueue() {
  2427     static WorkQueue commonSubmitterQueue() {
  2307         ForkJoinPool p; WorkQueue[] ws; int m, z;
  2428         ForkJoinPool p = common;
  2308         return ((z = ThreadLocalRandom.getProbe()) != 0 &&
  2429         int r = ThreadLocalRandom.getProbe();
  2309                 (p = common) != null &&
  2430         WorkQueue[] ws; int m;
  2310                 (ws = p.workQueues) != null &&
  2431         return (p != null && (ws = p.workQueues) != null &&
  2311                 (m = ws.length - 1) >= 0) ?
  2432                 (m = ws.length - 1) >= 0) ?
  2312             ws[m & z & SQMASK] : null;
  2433             ws[m & r & SQMASK] : null;
  2313     }
  2434     }
  2314 
  2435 
  2315     /**
  2436     /**
  2316      * Tries to pop the given task from submitter's queue in common pool.
  2437      * Performs tryUnpush for an external submitter: Finds queue,
       
  2438      * locks if apparently non-empty, validates upon locking, and
       
  2439      * adjusts top. Each check can fail but rarely does.
  2317      */
  2440      */
  2318     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
  2441     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
  2319         WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
  2442         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
  2320         WorkQueue[] ws = workQueues;
  2443         int r = ThreadLocalRandom.getProbe();
  2321         int z = ThreadLocalRandom.getProbe();
  2444         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
  2322         boolean popped = false;
  2445             (w = ws[m & r & SQMASK]) != null &&
  2323         if (ws != null && (m = ws.length - 1) >= 0 &&
  2446             (a = w.array) != null && (s = w.top) != w.base) {
  2324             (joiner = ws[z & m & SQMASK]) != null &&
       
  2325             joiner.base != (s = joiner.top) &&
       
  2326             (a = joiner.array) != null) {
       
  2327             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
  2447             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
  2328             if (U.getObject(a, j) == task &&
  2448             if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
  2329                 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
  2449                 if (w.top == s && w.array == a &&
  2330                 if (joiner.top == s && joiner.array == a &&
  2450                     U.getObject(a, j) == task &&
  2331                     U.compareAndSwapObject(a, j, task, null)) {
  2451                     U.compareAndSwapObject(a, j, task, null)) {
  2332                     joiner.top = s - 1;
  2452                     U.putOrderedInt(w, QTOP, s - 1);
  2333                     popped = true;
  2453                     U.putOrderedInt(w, QLOCK, 0);
       
  2454                     return true;
  2334                 }
  2455                 }
  2335                 joiner.qlock = 0;
  2456                 U.compareAndSwapInt(w, QLOCK, 1, 0);
  2336             }
  2457             }
  2337         }
  2458         }
  2338         return popped;
  2459         return false;
  2339     }
  2460     }
  2340 
  2461 
       
  2462     /**
       
  2463      * Performs helpComplete for an external submitter.
       
  2464      */
  2341     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
  2465     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
  2342         WorkQueue joiner; int m;
  2466         WorkQueue[] ws; int n;
  2343         WorkQueue[] ws = workQueues;
  2467         int r = ThreadLocalRandom.getProbe();
  2344         int j = ThreadLocalRandom.getProbe();
  2468         return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
  2345         int s = 0;
  2469             helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
  2346         if (ws != null && (m = ws.length - 1) >= 0 &&
       
  2347             (joiner = ws[j & m & SQMASK]) != null && task != null) {
       
  2348             int scans = m + m + 1;
       
  2349             long c = 0L;             // for stability check
       
  2350             j |= 1;                  // poll odd queues
       
  2351             for (int k = scans; ; j += 2) {
       
  2352                 WorkQueue q;
       
  2353                 if ((s = task.status) < 0)
       
  2354                     break;
       
  2355                 else if (joiner.externalPopAndExecCC(task)) {
       
  2356                     if (--maxTasks <= 0) {
       
  2357                         s = task.status;
       
  2358                         break;
       
  2359                     }
       
  2360                     k = scans;
       
  2361                 }
       
  2362                 else if ((s = task.status) < 0)
       
  2363                     break;
       
  2364                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
       
  2365                     if (--maxTasks <= 0) {
       
  2366                         s = task.status;
       
  2367                         break;
       
  2368                     }
       
  2369                     k = scans;
       
  2370                 }
       
  2371                 else if (--k < 0) {
       
  2372                     if (c == (c = ctl))
       
  2373                         break;
       
  2374                     k = scans;
       
  2375                 }
       
  2376             }
       
  2377         }
       
  2378         return s;
       
  2379     }
  2470     }
  2380 
  2471 
  2381     // Exported methods
  2472     // Exported methods
  2382 
  2473 
  2383     // Constructors
  2474     // Constructors
  2445                         UncaughtExceptionHandler handler,
  2536                         UncaughtExceptionHandler handler,
  2446                         boolean asyncMode) {
  2537                         boolean asyncMode) {
  2447         this(checkParallelism(parallelism),
  2538         this(checkParallelism(parallelism),
  2448              checkFactory(factory),
  2539              checkFactory(factory),
  2449              handler,
  2540              handler,
  2450              (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
  2541              asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  2451              "ForkJoinPool-" + nextPoolId() + "-worker-");
  2542              "ForkJoinPool-" + nextPoolId() + "-worker-");
  2452         checkPermission();
  2543         checkPermission();
  2453     }
  2544     }
  2454 
  2545 
  2455     private static int checkParallelism(int parallelism) {
  2546     private static int checkParallelism(int parallelism) {
  2476                          int mode,
  2567                          int mode,
  2477                          String workerNamePrefix) {
  2568                          String workerNamePrefix) {
  2478         this.workerNamePrefix = workerNamePrefix;
  2569         this.workerNamePrefix = workerNamePrefix;
  2479         this.factory = factory;
  2570         this.factory = factory;
  2480         this.ueh = handler;
  2571         this.ueh = handler;
  2481         this.mode = (short)mode;
  2572         this.config = (parallelism & SMASK) | mode;
  2482         this.parallelism = (short)parallelism;
       
  2483         long np = (long)(-parallelism); // offset ctl counts
  2573         long np = (long)(-parallelism); // offset ctl counts
  2484         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  2574         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  2485     }
  2575     }
  2486 
  2576 
  2487     /**
  2577     /**
  2622      */
  2712      */
  2623     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  2713     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  2624         // In previous versions of this class, this method constructed
  2714         // In previous versions of this class, this method constructed
  2625         // a task to run ForkJoinTask.invokeAll, but now external
  2715         // a task to run ForkJoinTask.invokeAll, but now external
  2626         // invocation of multiple tasks is at least as efficient.
  2716         // invocation of multiple tasks is at least as efficient.
  2627         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  2717         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
  2628 
  2718 
  2629         boolean done = false;
  2719         boolean done = false;
  2630         try {
  2720         try {
  2631             for (Callable<T> t : tasks) {
  2721             for (Callable<T> t : tasks) {
  2632                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
  2722                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
  2668      *
  2758      *
  2669      * @return the targeted parallelism level of this pool
  2759      * @return the targeted parallelism level of this pool
  2670      */
  2760      */
  2671     public int getParallelism() {
  2761     public int getParallelism() {
  2672         int par;
  2762         int par;
  2673         return ((par = parallelism) > 0) ? par : 1;
  2763         return ((par = config & SMASK) > 0) ? par : 1;
  2674     }
  2764     }
  2675 
  2765 
  2676     /**
  2766     /**
  2677      * Returns the targeted parallelism level of the common pool.
  2767      * Returns the targeted parallelism level of the common pool.
  2678      *
  2768      *
  2690      * maintain parallelism when others are cooperatively blocked.
  2780      * maintain parallelism when others are cooperatively blocked.
  2691      *
  2781      *
  2692      * @return the number of worker threads
  2782      * @return the number of worker threads
  2693      */
  2783      */
  2694     public int getPoolSize() {
  2784     public int getPoolSize() {
  2695         return parallelism + (short)(ctl >>> TC_SHIFT);
  2785         return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
  2696     }
  2786     }
  2697 
  2787 
  2698     /**
  2788     /**
  2699      * Returns {@code true} if this pool uses local first-in-first-out
  2789      * Returns {@code true} if this pool uses local first-in-first-out
  2700      * scheduling mode for forked tasks that are never joined.
  2790      * scheduling mode for forked tasks that are never joined.
  2701      *
  2791      *
  2702      * @return {@code true} if this pool uses async mode
  2792      * @return {@code true} if this pool uses async mode
  2703      */
  2793      */
  2704     public boolean getAsyncMode() {
  2794     public boolean getAsyncMode() {
  2705         return mode == FIFO_QUEUE;
  2795         return (config & FIFO_QUEUE) != 0;
  2706     }
  2796     }
  2707 
  2797 
  2708     /**
  2798     /**
  2709      * Returns an estimate of the number of worker threads that are
  2799      * Returns an estimate of the number of worker threads that are
  2710      * not blocked waiting to join tasks or for other managed
  2800      * not blocked waiting to join tasks or for other managed
  2731      * number of active threads.
  2821      * number of active threads.
  2732      *
  2822      *
  2733      * @return the number of active threads
  2823      * @return the number of active threads
  2734      */
  2824      */
  2735     public int getActiveThreadCount() {
  2825     public int getActiveThreadCount() {
  2736         int r = parallelism + (int)(ctl >> AC_SHIFT);
  2826         int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
  2737         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  2827         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  2738     }
  2828     }
  2739 
  2829 
  2740     /**
  2830     /**
  2741      * Returns {@code true} if all worker threads are currently idle.
  2831      * Returns {@code true} if all worker threads are currently idle.
  2747      * threads remain inactive.
  2837      * threads remain inactive.
  2748      *
  2838      *
  2749      * @return {@code true} if all threads are currently idle
  2839      * @return {@code true} if all threads are currently idle
  2750      */
  2840      */
  2751     public boolean isQuiescent() {
  2841     public boolean isQuiescent() {
  2752         return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
  2842         return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
  2753     }
  2843     }
  2754 
  2844 
  2755     /**
  2845     /**
  2756      * Returns an estimate of the total number of tasks stolen from
  2846      * Returns an estimate of the total number of tasks stolen from
  2757      * one thread's work queue by another. The reported value
  2847      * one thread's work queue by another. The reported value
  2762      * overhead and contention across threads.
  2852      * overhead and contention across threads.
  2763      *
  2853      *
  2764      * @return the number of steals
  2854      * @return the number of steals
  2765      */
  2855      */
  2766     public long getStealCount() {
  2856     public long getStealCount() {
  2767         long count = stealCount;
  2857         AtomicLong sc = stealCounter;
       
  2858         long count = (sc == null) ? 0L : sc.get();
  2768         WorkQueue[] ws; WorkQueue w;
  2859         WorkQueue[] ws; WorkQueue w;
  2769         if ((ws = workQueues) != null) {
  2860         if ((ws = workQueues) != null) {
  2770             for (int i = 1; i < ws.length; i += 2) {
  2861             for (int i = 1; i < ws.length; i += 2) {
  2771                 if ((w = ws[i]) != null)
  2862                 if ((w = ws[i]) != null)
  2772                     count += w.nsteals;
  2863                     count += w.nsteals;
  2892      * @return a string identifying this pool, as well as its state
  2983      * @return a string identifying this pool, as well as its state
  2893      */
  2984      */
  2894     public String toString() {
  2985     public String toString() {
  2895         // Use a single pass through workQueues to collect counts
  2986         // Use a single pass through workQueues to collect counts
  2896         long qt = 0L, qs = 0L; int rc = 0;
  2987         long qt = 0L, qs = 0L; int rc = 0;
  2897         long st = stealCount;
  2988         AtomicLong sc = stealCounter;
       
  2989         long st = (sc == null) ? 0L : sc.get();
  2898         long c = ctl;
  2990         long c = ctl;
  2899         WorkQueue[] ws; WorkQueue w;
  2991         WorkQueue[] ws; WorkQueue w;
  2900         if ((ws = workQueues) != null) {
  2992         if ((ws = workQueues) != null) {
  2901             for (int i = 0; i < ws.length; ++i) {
  2993             for (int i = 0; i < ws.length; ++i) {
  2902                 if ((w = ws[i]) != null) {
  2994                 if ((w = ws[i]) != null) {
  2910                             ++rc;
  3002                             ++rc;
  2911                     }
  3003                     }
  2912                 }
  3004                 }
  2913             }
  3005             }
  2914         }
  3006         }
  2915         int pc = parallelism;
  3007         int pc = (config & SMASK);
  2916         int tc = pc + (short)(c >>> TC_SHIFT);
  3008         int tc = pc + (short)(c >>> TC_SHIFT);
  2917         int ac = pc + (int)(c >> AC_SHIFT);
  3009         int ac = pc + (int)(c >> AC_SHIFT);
  2918         if (ac < 0) // ignore transient negative
  3010         if (ac < 0) // ignore transient negative
  2919             ac = 0;
  3011             ac = 0;
  2920         String level;
  3012         int rs = runState;
  2921         if ((c & STOP_BIT) != 0)
  3013         String level = ((rs & TERMINATED) != 0 ? "Terminated" :
  2922             level = (tc == 0) ? "Terminated" : "Terminating";
  3014                         (rs & STOP)       != 0 ? "Terminating" :
  2923         else
  3015                         (rs & SHUTDOWN)   != 0 ? "Shutting down" :
  2924             level = plock < 0 ? "Shutting down" : "Running";
  3016                         "Running");
  2925         return super.toString() +
  3017         return super.toString() +
  2926             "[" + level +
  3018             "[" + level +
  2927             ", parallelism = " + pc +
  3019             ", parallelism = " + pc +
  2928             ", size = " + tc +
  3020             ", size = " + tc +
  2929             ", active = " + ac +
  3021             ", active = " + ac +
  2981      * Returns {@code true} if all tasks have completed following shut down.
  3073      * Returns {@code true} if all tasks have completed following shut down.
  2982      *
  3074      *
  2983      * @return {@code true} if all tasks have completed following shut down
  3075      * @return {@code true} if all tasks have completed following shut down
  2984      */
  3076      */
  2985     public boolean isTerminated() {
  3077     public boolean isTerminated() {
  2986         long c = ctl;
  3078         return (runState & TERMINATED) != 0;
  2987         return ((c & STOP_BIT) != 0L &&
       
  2988                 (short)(c >>> TC_SHIFT) + parallelism <= 0);
       
  2989     }
  3079     }
  2990 
  3080 
  2991     /**
  3081     /**
  2992      * Returns {@code true} if the process of termination has
  3082      * Returns {@code true} if the process of termination has
  2993      * commenced but not yet completed.  This method may be useful for
  3083      * commenced but not yet completed.  This method may be useful for
  3000      * they do, they must abort them on interrupt.)
  3090      * they do, they must abort them on interrupt.)
  3001      *
  3091      *
  3002      * @return {@code true} if terminating but not yet terminated
  3092      * @return {@code true} if terminating but not yet terminated
  3003      */
  3093      */
  3004     public boolean isTerminating() {
  3094     public boolean isTerminating() {
  3005         long c = ctl;
  3095         int rs = runState;
  3006         return ((c & STOP_BIT) != 0L &&
  3096         return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
  3007                 (short)(c >>> TC_SHIFT) + parallelism > 0);
       
  3008     }
  3097     }
  3009 
  3098 
  3010     /**
  3099     /**
  3011      * Returns {@code true} if this pool has been shut down.
  3100      * Returns {@code true} if this pool has been shut down.
  3012      *
  3101      *
  3013      * @return {@code true} if this pool has been shut down
  3102      * @return {@code true} if this pool has been shut down
  3014      */
  3103      */
  3015     public boolean isShutdown() {
  3104     public boolean isShutdown() {
  3016         return plock < 0;
  3105         return (runState & SHUTDOWN) != 0;
  3017     }
  3106     }
  3018 
  3107 
  3019     /**
  3108     /**
  3020      * Blocks until all tasks have completed execution after a
  3109      * Blocks until all tasks have completed execution after a
  3021      * shutdown request, or the timeout occurs, or the current thread
  3110      * shutdown request, or the timeout occurs, or the current thread
  3088                     return false;
  3177                     return false;
  3089                 Thread.yield(); // cannot block
  3178                 Thread.yield(); // cannot block
  3090             }
  3179             }
  3091             found = false;
  3180             found = false;
  3092             for (int j = (m + 1) << 2; j >= 0; --j) {
  3181             for (int j = (m + 1) << 2; j >= 0; --j) {
  3093                 ForkJoinTask<?> t; WorkQueue q; int b;
  3182                 ForkJoinTask<?> t; WorkQueue q; int b, k;
  3094                 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
  3183                 if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
       
  3184                     (b = q.base) - q.top < 0) {
  3095                     found = true;
  3185                     found = true;
  3096                     if ((t = q.pollAt(b)) != null)
  3186                     if ((t = q.pollAt(b)) != null)
  3097                         t.doExec();
  3187                         t.doExec();
  3098                     break;
  3188                     break;
  3099                 }
  3189                 }
  3113     /**
  3203     /**
  3114      * Interface for extending managed parallelism for tasks running
  3204      * Interface for extending managed parallelism for tasks running
  3115      * in {@link ForkJoinPool}s.
  3205      * in {@link ForkJoinPool}s.
  3116      *
  3206      *
  3117      * <p>A {@code ManagedBlocker} provides two methods.  Method
  3207      * <p>A {@code ManagedBlocker} provides two methods.  Method
  3118      * {@code isReleasable} must return {@code true} if blocking is
  3208      * {@link #isReleasable} must return {@code true} if blocking is
  3119      * not necessary. Method {@code block} blocks the current thread
  3209      * not necessary. Method {@link #block} blocks the current thread
  3120      * if necessary (perhaps internally invoking {@code isReleasable}
  3210      * if necessary (perhaps internally invoking {@code isReleasable}
  3121      * before actually blocking). These actions are performed by any
  3211      * before actually blocking). These actions are performed by any
  3122      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
  3212      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
  3123      * The unusual methods in this API accommodate synchronizers that
  3213      * The unusual methods in this API accommodate synchronizers that
  3124      * may, but don't usually, block for long periods. Similarly, they
  3214      * may, but don't usually, block for long periods. Similarly, they
  3183          */
  3273          */
  3184         boolean isReleasable();
  3274         boolean isReleasable();
  3185     }
  3275     }
  3186 
  3276 
  3187     /**
  3277     /**
  3188      * Blocks in accord with the given blocker.  If the current thread
  3278      * Runs the given possibly blocking task.  When {@linkplain
  3189      * is a {@link ForkJoinWorkerThread}, this method possibly
  3279      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
  3190      * arranges for a spare thread to be activated if necessary to
  3280      * method possibly arranges for a spare thread to be activated if
  3191      * ensure sufficient parallelism while the current thread is blocked.
  3281      * necessary to ensure sufficient parallelism while the current
  3192      *
  3282      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
  3193      * <p>If the caller is not a {@link ForkJoinTask}, this method is
  3283      *
       
  3284      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
       
  3285      * {@code blocker.block()} until either method returns {@code true}.
       
  3286      * Every call to {@code blocker.block()} is preceded by a call to
       
  3287      * {@code blocker.isReleasable()} that returned {@code false}.
       
  3288      *
       
  3289      * <p>If not running in a ForkJoinPool, this method is
  3194      * behaviorally equivalent to
  3290      * behaviorally equivalent to
  3195      *  <pre> {@code
  3291      *  <pre> {@code
  3196      * while (!blocker.isReleasable())
  3292      * while (!blocker.isReleasable())
  3197      *   if (blocker.block())
  3293      *   if (blocker.block())
  3198      *     return;
  3294      *     break;}</pre>
  3199      * }</pre>
  3295      *
  3200      *
  3296      * If running in a ForkJoinPool, the pool may first be expanded to
  3201      * If the caller is a {@code ForkJoinTask}, then the pool may
  3297      * ensure sufficient parallelism available during the call to
  3202      * first be expanded to ensure parallelism, and later adjusted.
  3298      * {@code blocker.block()}.
  3203      *
  3299      *
  3204      * @param blocker the blocker
  3300      * @param blocker the blocker task
  3205      * @throws InterruptedException if blocker.block did so
  3301      * @throws InterruptedException if {@code blocker.block()} did so
  3206      */
  3302      */
  3207     public static void managedBlock(ManagedBlocker blocker)
  3303     public static void managedBlock(ManagedBlocker blocker)
  3208         throws InterruptedException {
  3304         throws InterruptedException {
       
  3305         ForkJoinPool p;
       
  3306         ForkJoinWorkerThread wt;
  3209         Thread t = Thread.currentThread();
  3307         Thread t = Thread.currentThread();
  3210         if (t instanceof ForkJoinWorkerThread) {
  3308         if ((t instanceof ForkJoinWorkerThread) &&
  3211             ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
  3309             (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
       
  3310             WorkQueue w = wt.workQueue;
  3212             while (!blocker.isReleasable()) {
  3311             while (!blocker.isReleasable()) {
  3213                 if (p.tryCompensate(p.ctl)) {
  3312                 if (p.tryCompensate(w)) {
  3214                     try {
  3313                     try {
  3215                         do {} while (!blocker.isReleasable() &&
  3314                         do {} while (!blocker.isReleasable() &&
  3216                                      !blocker.block());
  3315                                      !blocker.block());
  3217                     } finally {
  3316                     } finally {
  3218                         p.incrementActiveCount();
  3317                         U.getAndAddLong(p, CTL, AC_UNIT);
  3219                     }
  3318                     }
  3220                     break;
  3319                     break;
  3221                 }
  3320                 }
  3222             }
  3321             }
  3223         }
  3322         }
  3239         return new ForkJoinTask.AdaptedCallable<T>(callable);
  3338         return new ForkJoinTask.AdaptedCallable<T>(callable);
  3240     }
  3339     }
  3241 
  3340 
  3242     // Unsafe mechanics
  3341     // Unsafe mechanics
  3243     private static final sun.misc.Unsafe U;
  3342     private static final sun.misc.Unsafe U;
       
  3343     private static final int  ABASE;
       
  3344     private static final int  ASHIFT;
  3244     private static final long CTL;
  3345     private static final long CTL;
       
  3346     private static final long RUNSTATE;
       
  3347     private static final long STEALCOUNTER;
  3245     private static final long PARKBLOCKER;
  3348     private static final long PARKBLOCKER;
  3246     private static final int ABASE;
  3349     private static final long QTOP;
  3247     private static final int ASHIFT;
       
  3248     private static final long STEALCOUNT;
       
  3249     private static final long PLOCK;
       
  3250     private static final long INDEXSEED;
       
  3251     private static final long QBASE;
       
  3252     private static final long QLOCK;
  3350     private static final long QLOCK;
       
  3351     private static final long QSCANSTATE;
       
  3352     private static final long QPARKER;
       
  3353     private static final long QCURRENTSTEAL;
       
  3354     private static final long QCURRENTJOIN;
  3253 
  3355 
  3254     static {
  3356     static {
  3255         // initialize field offsets for CAS etc
  3357         // initialize field offsets for CAS etc
  3256         try {
  3358         try {
  3257             U = sun.misc.Unsafe.getUnsafe();
  3359             U = sun.misc.Unsafe.getUnsafe();
  3258             Class<?> k = ForkJoinPool.class;
  3360             Class<?> k = ForkJoinPool.class;
  3259             CTL = U.objectFieldOffset
  3361             CTL = U.objectFieldOffset
  3260                 (k.getDeclaredField("ctl"));
  3362                 (k.getDeclaredField("ctl"));
  3261             STEALCOUNT = U.objectFieldOffset
  3363             RUNSTATE = U.objectFieldOffset
  3262                 (k.getDeclaredField("stealCount"));
  3364                 (k.getDeclaredField("runState"));
  3263             PLOCK = U.objectFieldOffset
  3365             STEALCOUNTER = U.objectFieldOffset
  3264                 (k.getDeclaredField("plock"));
  3366                 (k.getDeclaredField("stealCounter"));
  3265             INDEXSEED = U.objectFieldOffset
       
  3266                 (k.getDeclaredField("indexSeed"));
       
  3267             Class<?> tk = Thread.class;
  3367             Class<?> tk = Thread.class;
  3268             PARKBLOCKER = U.objectFieldOffset
  3368             PARKBLOCKER = U.objectFieldOffset
  3269                 (tk.getDeclaredField("parkBlocker"));
  3369                 (tk.getDeclaredField("parkBlocker"));
  3270             Class<?> wk = WorkQueue.class;
  3370             Class<?> wk = WorkQueue.class;
  3271             QBASE = U.objectFieldOffset
  3371             QTOP = U.objectFieldOffset
  3272                 (wk.getDeclaredField("base"));
  3372                 (wk.getDeclaredField("top"));
  3273             QLOCK = U.objectFieldOffset
  3373             QLOCK = U.objectFieldOffset
  3274                 (wk.getDeclaredField("qlock"));
  3374                 (wk.getDeclaredField("qlock"));
       
  3375             QSCANSTATE = U.objectFieldOffset
       
  3376                 (wk.getDeclaredField("scanState"));
       
  3377             QPARKER = U.objectFieldOffset
       
  3378                 (wk.getDeclaredField("parker"));
       
  3379             QCURRENTSTEAL = U.objectFieldOffset
       
  3380                 (wk.getDeclaredField("currentSteal"));
       
  3381             QCURRENTJOIN = U.objectFieldOffset
       
  3382                 (wk.getDeclaredField("currentJoin"));
  3275             Class<?> ak = ForkJoinTask[].class;
  3383             Class<?> ak = ForkJoinTask[].class;
  3276             ABASE = U.arrayBaseOffset(ak);
  3384             ABASE = U.arrayBaseOffset(ak);
  3277             int scale = U.arrayIndexScale(ak);
  3385             int scale = U.arrayIndexScale(ak);
  3278             if ((scale & (scale - 1)) != 0)
  3386             if ((scale & (scale - 1)) != 0)
  3279                 throw new Error("data type scale not a power of two");
  3387                 throw new Error("data type scale not a power of two");
  3280             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  3388             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  3281         } catch (Exception e) {
  3389         } catch (Exception e) {
  3282             throw new Error(e);
  3390             throw new Error(e);
  3283         }
  3391         }
  3284 
  3392 
       
  3393         commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
  3285         defaultForkJoinWorkerThreadFactory =
  3394         defaultForkJoinWorkerThreadFactory =
  3286             new DefaultForkJoinWorkerThreadFactory();
  3395             new DefaultForkJoinWorkerThreadFactory();
  3287         modifyThreadPermission = new RuntimePermission("modifyThread");
  3396         modifyThreadPermission = new RuntimePermission("modifyThread");
  3288 
  3397 
  3289         common = java.security.AccessController.doPrivileged
  3398         common = java.security.AccessController.doPrivileged
  3290             (new java.security.PrivilegedAction<ForkJoinPool>() {
  3399             (new java.security.PrivilegedAction<ForkJoinPool>() {
  3291                 public ForkJoinPool run() { return makeCommonPool(); }});
  3400                 public ForkJoinPool run() { return makeCommonPool(); }});
  3292         int par = common.parallelism; // report 1 even if threads disabled
  3401         int par = common.config & SMASK; // report 1 even if threads disabled
  3293         commonParallelism = par > 0 ? par : 1;
  3402         commonParallelism = par > 0 ? par : 1;
  3294     }
  3403     }
  3295 
  3404 
  3296     /**
  3405     /**
  3297      * Creates and returns the common pool, respecting user settings
  3406      * Creates and returns the common pool, respecting user settings
  3306                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
  3415                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
  3307             String fp = System.getProperty
  3416             String fp = System.getProperty
  3308                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
  3417                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
  3309             String hp = System.getProperty
  3418             String hp = System.getProperty
  3310                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
  3419                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
       
  3420             String mp = System.getProperty
       
  3421                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
  3311             if (pp != null)
  3422             if (pp != null)
  3312                 parallelism = Integer.parseInt(pp);
  3423                 parallelism = Integer.parseInt(pp);
  3313             if (fp != null)
  3424             if (fp != null)
  3314                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
  3425                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
  3315                            getSystemClassLoader().loadClass(fp).newInstance());
  3426                            getSystemClassLoader().loadClass(fp).newInstance());
  3316             if (hp != null)
  3427             if (hp != null)
  3317                 handler = ((UncaughtExceptionHandler)ClassLoader.
  3428                 handler = ((UncaughtExceptionHandler)ClassLoader.
  3318                            getSystemClassLoader().loadClass(hp).newInstance());
  3429                            getSystemClassLoader().loadClass(hp).newInstance());
       
  3430             if (mp != null)
       
  3431                 commonMaxSpares = Integer.parseInt(mp);
  3319         } catch (Exception ignore) {
  3432         } catch (Exception ignore) {
  3320         }
  3433         }
  3321         if (factory == null) {
  3434         if (factory == null) {
  3322             if (System.getSecurityManager() == null)
  3435             if (System.getSecurityManager() == null)
  3323                 factory = defaultForkJoinWorkerThreadFactory;
  3436                 factory = defaultForkJoinWorkerThreadFactory;