# HG changeset patch # User dl # Date 1409907268 -7200 # Node ID 5853628b0e63194fb26e865755e30c53445b847b # Parent 5da963ed07209e637147038d0c6c9737d7a60842 8056248: Improve ForkJoin thread throttling Reviewed-by: psandoz, martin diff -r 5da963ed0720 -r 5853628b0e63 jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Thu Sep 04 12:23:01 2014 -0400 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Fri Sep 05 10:54:28 2014 +0200 @@ -49,6 +49,7 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.security.AccessControlContext; import java.security.ProtectionDomain; import java.security.Permissions; @@ -80,9 +81,9 @@ * *

For applications that require separate or custom pools, a {@code * ForkJoinPool} may be constructed with a given target parallelism - * level; by default, equal to the number of available processors. The - * pool attempts to maintain enough active (or available) threads by - * dynamically adding, suspending, or resuming internal worker + * level; by default, equal to the number of available processors. + * The pool attempts to maintain enough active (or available) threads + * by dynamically adding, suspending, or resuming internal worker * threads, even if some tasks are stalled waiting to join others. * However, no such adjustments are guaranteed in the face of blocked * I/O or other unmanaged synchronization. The nested {@link @@ -142,6 +143,9 @@ * - the class name of a {@link ForkJoinWorkerThreadFactory} *

  • {@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} * - the class name of a {@link UncaughtExceptionHandler} + *
  • {@code java.util.concurrent.ForkJoinPool.common.maximumSpares} + * - the maximum number of allowed extra threads to maintain target + * parallelism (default 256). * * If a {@link SecurityManager} is present and no factory is * specified, then the default pool uses a factory supplying @@ -178,7 +182,14 @@ * that may be stolen by other workers. Preference rules give * first priority to processing tasks from their own queues (LIFO * or FIFO, depending on mode), then to randomized FIFO steals of - * tasks in other queues. + * tasks in other queues. This framework began as vehicle for + * supporting tree-structured parallelism using work-stealing. + * Over time, its scalability advantages led to extensions and + * changes to better support more diverse usage contexts. Because + * most internal methods and nested classes are interrelated, + * their main rationale and descriptions are presented here; + * individual methods and nested classes contain only brief + * comments about details. * * WorkQueues * ========== @@ -198,201 +209,318 @@ * (http://research.sun.com/scalable/pubs/index.html) and * "Idempotent work stealing" by Michael, Saraswat, and Vechev, * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). - * See also "Correct and Efficient Work-Stealing for Weak Memory - * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 - * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an - * analysis of memory ordering (atomic, volatile etc) issues. The - * main differences ultimately stem from GC requirements that we - * null out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs poll (steal) from being on the indices ("base" and "top") to - * the slots themselves. So, both a successful pop and poll - * mainly entail a CAS of a slot from non-null to null. Because - * we rely on CASes of references, we do not need tag bits on base - * or top. They are simple ints as used in any circular + * The main differences ultimately stem from GC requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs poll (steal) from being on the indices + * ("base" and "top") to the slots themselves. + * + * Adding tasks then takes the form of a classic array push(task): + * q.array[q.top] = task; ++q.top; + * + * (The actual code needs to null-check and size-check the array, + * properly fence the accesses, and possibly signal waiting + * workers to start scanning -- see below.) Both a successful pop + * and poll mainly entail a CAS of a slot from non-null to null. + * + * The pop operation (always performed by owner) is: + * if ((base != top) and + * (the task at top slot is not null) and + * (CAS slot to null)) + * decrement top and return task; + * + * And the poll operation (usually by a stealer) is + * if ((base != top) and + * (the task at base slot is not null) and + * (base has not changed) and + * (CAS slot to null)) + * increment base and return task; + * + * Because we rely on CASes of references, we do not need tag bits + * on base or top. They are simple ints as used in any circular * array-based queue (see for example ArrayDeque). Updates to the - * indices must still be ordered in a way that guarantees that top - * == base means the queue is empty, but otherwise may err on the - * side of possibly making the queue appear nonempty when a push, - * pop, or poll have not fully committed. Note that this means - * that the poll operation, considered individually, is not - * wait-free. One thief cannot successfully continue until another - * in-progress one (or, if previously empty, a push) completes. - * However, in the aggregate, we ensure at least probabilistic + * indices guarantee that top == base means the queue is empty, + * but otherwise may err on the side of possibly making the queue + * appear nonempty when a push, pop, or poll have not fully + * committed. (Method isEmpty() checks the case of a partially + * completed removal of the last element.) Because of this, the + * poll operation, considered individually, is not wait-free. One + * thief cannot successfully continue until another in-progress + * one (or, if previously empty, a push) completes. However, in + * the aggregate, we ensure at least probabilistic * non-blockingness. If an attempted steal fails, a thief always * chooses a different random victim target to try next. So, in * order for one thief to progress, it suffices for any * in-progress poll or new push on any empty queue to * complete. (This is why we normally use method pollAt and its * variants that try once at the apparent base index, else - * consider alternative actions, rather than method poll.) + * consider alternative actions, rather than method poll, which + * retries.) * - * This approach also enables support of a user mode in which local - * task processing is in FIFO, not LIFO order, simply by using - * poll rather than pop. This can be useful in message-passing - * frameworks in which tasks are never joined. However neither - * mode considers affinities, loads, cache localities, etc, so - * rarely provide the best possible performance on a given - * machine, but portably provide good throughput by averaging over - * these factors. (Further, even if we did try to use such - * information, we do not usually have a basis for exploiting it. - * For example, some sets of tasks profit from cache affinities, - * but others are harmed by cache pollution effects.) + * This approach also enables support of a user mode in which + * local task processing is in FIFO, not LIFO order, simply by + * using poll rather than pop. This can be useful in + * message-passing frameworks in which tasks are never joined. + * However neither mode considers affinities, loads, cache + * localities, etc, so rarely provide the best possible + * performance on a given machine, but portably provide good + * throughput by averaging over these factors. Further, even if + * we did try to use such information, we do not usually have a + * basis for exploiting it. For example, some sets of tasks + * profit from cache affinities, but others are harmed by cache + * pollution effects. Additionally, even though it requires + * scanning, long-term throughput is often best using random + * selection rather than directed selection policies, so cheap + * randomization of sufficient quality is used whenever + * applicable. Various Marsaglia XorShifts (some with different + * shift constants) are inlined at use points. * * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used - * for work-stealing (this would contaminate lifo/fifo - * processing). Instead, we randomly associate submission queues + * by workers. Instead, we randomly associate submission queues * with submitting threads, using a form of hashing. The * ThreadLocalRandom probe value serves as a hash code for * choosing existing queues, and may be randomly repositioned upon * contention with other submitters. In essence, submitters act * like workers except that they are restricted to executing local * tasks that they submitted (or in the case of CountedCompleters, - * others with the same root task). However, because most - * shared/external queue operations are more expensive than - * internal, and because, at steady state, external submitters - * will compete for CPU with workers, ForkJoinTask.join and - * related methods disable them from repeatedly helping to process - * tasks if all workers are active. Insertion of tasks in shared + * others with the same root task). Insertion of tasks in shared * mode requires a lock (mainly to protect in the case of - * resizing) but we use only a simple spinlock (using bits in - * field qlock), because submitters encountering a busy queue move - * on to try or create other queues -- they block only when - * creating and registering new queues. + * resizing) but we use only a simple spinlock (using field + * qlock), because submitters encountering a busy queue move on to + * try or create other queues -- they block only when creating and + * registering new queues. Additionally, "qlock" saturates to an + * unlockable value (-1) at shutdown. Unlocking still can be and + * is performed by cheaper ordered writes of "qlock" in successful + * cases, but uses CAS in unsuccessful cases. * * Management * ========== * * The main throughput advantages of work-stealing stem from * decentralized control -- workers mostly take tasks from - * themselves or each other. We cannot negate this in the - * implementation of other management responsibilities. The main - * tactic for avoiding bottlenecks is packing nearly all - * essentially atomic control state into two volatile variables - * that are by far most often read (not written) as status and - * consistency checks. + * themselves or each other, at rates that can exceed a billion + * per second. The pool itself creates, activates (enables + * scanning for and running tasks), deactivates, blocks, and + * terminates threads, all with minimal central information. + * There are only a few properties that we can globally track or + * maintain, so we pack them into a small number of variables, + * often maintaining atomicity without blocking or locking. + * Nearly all essentially atomic control state is held in two + * volatile variables that are by far most often read (not + * written) as status and consistency checks. (Also, field + * "config" holds unchanging configuration state.) * - * Field "ctl" contains 64 bits holding all the information needed - * to atomically decide to add, inactivate, enqueue (on an event + * Field "ctl" contains 64 bits holding information needed to + * atomically decide to add, inactivate, enqueue (on an event * queue), dequeue, and/or re-activate workers. To enable this * packing, we restrict maximum parallelism to (1<<15)-1 (which is * far in excess of normal operating range) to allow ids, counts, * and their negations (used for thresholding) to fit into 16bit - * fields. + * subfields. * - * Field "plock" is a form of sequence lock with a saturating - * shutdown bit (similarly for per-queue "qlocks"), mainly - * protecting updates to the workQueues array, as well as to - * enable shutdown. When used as a lock, it is normally only very - * briefly held, so is nearly always available after at most a - * brief spin, but we use a monitor-based backup strategy to - * block when needed. + * Field "runState" holds lockable state bits (STARTED, STOP, etc) + * also protecting updates to the workQueues array. When used as + * a lock, it is normally held only for a few instructions (the + * only exceptions are one-time array initialization and uncommon + * resizing), so is nearly always available after at most a brief + * spin. But to be extra-cautious, after spinning, method + * awaitRunStateLock (called only if an initial CAS fails), uses a + * wait/notify mechanics on a builtin monitor to block when + * (rarely) needed. This would be a terrible idea for a highly + * contended lock, but most pools run without the lock ever + * contending after the spin limit, so this works fine as a more + * conservative alternative. Because we don't otherwise have an + * internal Object to use as a monitor, the "stealCounter" (an + * AtomicLong) is used when available (it too must be lazily + * initialized; see externalSubmit). + * + * Usages of "runState" vs "ctl" interact in only one case: + * deciding to add a worker thread (see tryAddWorker), in which + * case the ctl CAS is performed while the lock is held. * * Recording WorkQueues. WorkQueues are recorded in the - * "workQueues" array that is created upon first use and expanded - * if necessary. Updates to the array while recording new workers - * and unrecording terminated ones are protected from each other - * by a lock but the array is otherwise concurrently readable, and - * accessed directly. To simplify index-based operations, the - * array size is always a power of two, and all readers must - * tolerate null slots. Worker queues are at odd indices. Shared - * (submission) queues are at even indices, up to a maximum of 64 - * slots, to limit growth even if array needs to expand to add - * more workers. Grouping them together in this way simplifies and - * speeds up task scanning. + * "workQueues" array. The array is created upon first use (see + * externalSubmit) and expanded if necessary. Updates to the + * array while recording new workers and unrecording terminated + * ones are protected from each other by the runState lock, but + * the array is otherwise concurrently readable, and accessed + * directly. We also ensure that reads of the array reference + * itself never become too stale. To simplify index-based + * operations, the array size is always a power of two, and all + * readers must tolerate null slots. Worker queues are at odd + * indices. Shared (submission) queues are at even indices, up to + * a maximum of 64 slots, to limit growth even if array needs to + * expand to add more workers. Grouping them together in this way + * simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or * compensation for blocked workers. However, all other support * code is set up to work with other policies. To ensure that we - * do not hold on to worker references that would prevent GC, ALL + * do not hold on to worker references that would prevent GC, All * accesses to workQueues are via indices into the workQueues * array (which is one source of some of the messy code * constructions here). In essence, the workQueues array serves as - * a weak reference mechanism. Thus for example the wait queue - * field of ctl stores indices, not references. Access to the - * workQueues in associated methods (for example signalWork) must - * both index-check and null-check the IDs. All such accesses - * ignore bad IDs by returning out early from what they are doing, - * since this can only be associated with termination, in which - * case it is OK to give up. All uses of the workQueues array - * also check that it is non-null (even if previously - * non-null). This allows nulling during termination, which is - * currently not necessary, but remains an option for - * resource-revocation-based shutdown schemes. It also helps - * reduce JIT issuance of uncommon-trap code, which tends to - * unnecessarily complicate control flow in some methods. + * a weak reference mechanism. Thus for example the stack top + * subfield of ctl stores indices, not references. + * + * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we + * cannot let workers spin indefinitely scanning for tasks when + * none can be found immediately, and we cannot start/resume + * workers unless there appear to be tasks available. On the + * other hand, we must quickly prod them into action when new + * tasks are submitted or generated. In many usages, ramp-up time + * to activate workers is the main limiting factor in overall + * performance, which is compounded at program start-up by JIT + * compilation and allocation. So we streamline this as much as + * possible. + * + * The "ctl" field atomically maintains active and total worker + * counts as well as a queue to place waiting threads so they can + * be located for signalling. Active counts also play the role of + * quiescence indicators, so are decremented when workers believe + * that there are no more tasks to execute. The "queue" is + * actually a form of Treiber stack. A stack is ideal for + * activating threads in most-recently used order. This improves + * performance and locality, outweighing the disadvantages of + * being prone to contention and inability to release a worker + * unless it is topmost on stack. We park/unpark workers after + * pushing on the idle worker stack (represented by the lower + * 32bit subfield of ctl) when they cannot find work. The top + * stack state holds the value of the "scanState" field of the + * worker: its index and status, plus a version counter that, in + * addition to the count subfields (also serving as version + * stamps) provide protection against Treiber stack ABA effects. + * + * Field scanState is used by both workers and the pool to manage + * and track whether a worker is INACTIVE (possibly blocked + * waiting for a signal), or SCANNING for tasks (when neither hold + * it is busy running tasks). When a worker is inactivated, its + * scanState field is set, and is prevented from executing tasks, + * even though it must scan once for them to avoid queuing + * races. Note that scanState updates lag queue CAS releases so + * usage requires care. When queued, the lower 16 bits of + * scanState must hold its pool index. So we place the index there + * upon initialization (see registerWorker) and otherwise keep it + * there or restore it when necessary. * - * Event Queuing. Unlike HPC work-stealing frameworks, we cannot - * let workers spin indefinitely scanning for tasks when none can - * be found immediately, and we cannot start/resume workers unless - * there appear to be tasks available. On the other hand, we must - * quickly prod them into action when new tasks are submitted or - * generated. In many usages, ramp-up time to activate workers is - * the main limiting factor in overall performance (this is - * compounded at program start-up by JIT compilation and - * allocation). So we try to streamline this as much as possible. - * We park/unpark workers after placing in an event wait queue - * when they cannot find work. This "queue" is actually a simple - * Treiber stack, headed by the "id" field of ctl, plus a 15bit - * counter value (that reflects the number of times a worker has - * been inactivated) to avoid ABA effects (we need only as many - * version numbers as worker threads). Successors are held in - * field WorkQueue.nextWait. Queuing deals with several intrinsic - * races, mainly that a task-producing thread can miss seeing (and - * signalling) another thread that gave up looking for work but - * has not yet entered the wait queue. We solve this by requiring - * a full sweep of all workers (via repeated calls to method - * scan()) both before and after a newly waiting worker is added - * to the wait queue. Because enqueued workers may actually be - * rescanning rather than waiting, we set and clear the "parker" + * Memory ordering. See "Correct and Efficient Work-Stealing for + * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 + * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an + * analysis of memory ordering requirements in work-stealing + * algorithms similar to the one used here. We usually need + * stronger than minimal ordering because we must sometimes signal + * workers, requiring Dekker-like full-fences to avoid lost + * signals. Arranging for enough ordering without expensive + * over-fencing requires tradeoffs among the supported means of + * expressing access constraints. The most central operations, + * taking from queues and updating ctl state, require full-fence + * CAS. Array slots are read using the emulation of volatiles + * provided by Unsafe. Access from other threads to WorkQueue + * base, top, and array requires a volatile load of the first of + * any of these read. We use the convention of declaring the + * "base" index volatile, and always read it before other fields. + * The owner thread must ensure ordered updates, so writes use + * ordered intrinsics unless they can piggyback on those for other + * writes. Similar conventions and rationales hold for other + * WorkQueue fields (such as "currentSteal") that are only written + * by owners but observed by others. + * + * Creating workers. To create a worker, we pre-increment total + * count (serving as a reservation), and attempt to construct a + * ForkJoinWorkerThread via its factory. Upon construction, the + * new thread invokes registerWorker, where it constructs a + * WorkQueue and is assigned an index in the workQueues array + * (expanding the array if necessary). The thread is then + * started. Upon any exception across these steps, or null return + * from factory, deregisterWorker adjusts counts and records + * accordingly. If a null return, the pool continues running with + * fewer than the target number workers. If exceptional, the + * exception is propagated, generally to some external caller. + * Worker index assignment avoids the bias in scanning that would + * occur if entries were sequentially packed starting at the front + * of the workQueues array. We treat the array as a simple + * power-of-two hash table, expanding as needed. The seedIndex + * increment ensures no collisions until a resize is needed or a + * worker is deregistered and replaced, and thereafter keeps + * probability of collision low. We cannot use + * ThreadLocalRandom.getProbe() for similar purposes here because + * the thread has not started yet, but do so for creating + * submission queues for existing external threads. + * + * Deactivation and waiting. Queuing encounters several intrinsic + * races; most notably that a task-producing thread can miss + * seeing (and signalling) another thread that gave up looking for + * work but has not yet entered the wait queue. When a worker + * cannot find a task to steal, it deactivates and enqueues. Very + * often, the lack of tasks is transient due to GC or OS + * scheduling. To reduce false-alarm deactivation, scanners + * compute checksums of queue states during sweeps. (The + * stability checks used here and elsewhere are probabilistic + * variants of snapshot techniques -- see Herlihy & Shavit.) + * Workers give up and try to deactivate only after the sum is + * stable across scans. Further, to avoid missed signals, they + * repeat this scanning process after successful enqueuing until + * again stable. In this state, the worker cannot take/run a task + * it sees until it is released from the queue, so the worker + * itself eventually tries to release itself or any successor (see + * tryRelease). Otherwise, upon an empty scan, a deactivated + * worker uses an adaptive local spin construction (see awaitWork) + * before blocking (via park). Note the unusual conventions about + * Thread.interrupts surrounding parking and other blocking: + * Because interrupts are used solely to alert threads to check + * termination, which is checked anyway upon blocking, we clear + * status (using Thread.interrupted) before any call to park, so + * that park does not immediately return due to status being set + * via some other unrelated call to interrupt in user code. + * + * Signalling and activation. Workers are created or activated + * only when there appears to be at least one task they might be + * able to find and execute. Upon push (either by a worker or an + * external submission) to a previously (possibly) empty queue, + * workers are signalled if idle, or created if fewer exist than + * the given parallelism level. These primary signals are + * buttressed by others whenever other threads remove a task from + * a queue and notice that there are other tasks there as well. + * On most platforms, signalling (unpark) overhead time is + * noticeably long, and the time between signalling a thread and + * it actually making progress can be very noticeably long, so it + * is worth offloading these delays from critical paths as much as + * possible. Also, because inactive workers are often rescanning + * or spinning rather than blocking, we set and clear the "parker" * field of WorkQueues to reduce unnecessary calls to unpark. * (This requires a secondary recheck to avoid missed signals.) - * Note the unusual conventions about Thread.interrupts - * surrounding parking and other blocking: Because interrupts are - * used solely to alert threads to check termination, which is - * checked anyway upon blocking, we clear status (using - * Thread.interrupted) before any call to park, so that park does - * not immediately return due to status being set via some other - * unrelated call to interrupt in user code. - * - * Signalling. We create or wake up workers only when there - * appears to be at least one task they might be able to find and - * execute. When a submission is added or another worker adds a - * task to a queue that has fewer than two tasks, they signal - * waiting workers (or trigger creation of new ones if fewer than - * the given parallelism level -- signalWork). These primary - * signals are buttressed by others whenever other threads remove - * a task from a queue and notice that there are other tasks there - * as well. So in general, pools will be over-signalled. On most - * platforms, signalling (unpark) overhead time is noticeably - * long, and the time between signalling a thread and it actually - * making progress can be very noticeably long, so it is worth - * offloading these delays from critical paths as much as - * possible. Additionally, workers spin-down gradually, by staying - * alive so long as they see the ctl state changing. Similar - * stability-sensing techniques are also used before blocking in - * awaitJoin and helpComplete. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will - * time out and terminate if the pool has remained quiescent for a - * given period -- a short period if there are more threads than - * parallelism, longer as the number of threads decreases. This - * will slowly propagate, eventually terminating all workers after - * periods of non-use. + * time out and terminate (see awaitWork) if the pool has remained + * quiescent for period IDLE_TIMEOUT, increasing the period as the + * number of threads decreases, eventually removing all workers. + * Also, when more than two spare threads exist, excess threads + * are immediately terminated at the next quiescent point. + * (Padding by two avoids hysteresis.) * - * Shutdown and Termination. A call to shutdownNow atomically sets - * a plock bit and then (non-atomically) sets each worker's - * qlock status, cancels all unprocessed tasks, and wakes up - * all waiting workers. Detecting whether termination should - * commence after a non-abrupt shutdown() call requires more work - * and bookkeeping. We need consensus about quiescence (i.e., that - * there is no more work). The active count provides a primary - * indication but non-abrupt shutdown still requires a rechecking - * scan for any workers that are inactive but not queued. + * Shutdown and Termination. A call to shutdownNow invokes + * tryTerminate to atomically set a runState bit. The calling + * thread, as well as every other worker thereafter terminating, + * helps terminate others by setting their (qlock) status, + * cancelling their unprocessed tasks, and waking them up, doing + * so repeatedly until stable (but with a loop bounded by the + * number of workers). Calls to non-abrupt shutdown() preface + * this by checking whether termination should commence. This + * relies primarily on the active count bits of "ctl" maintaining + * consensus -- tryTerminate is called from awaitWork whenever + * quiescent. However, external submitters do not take part in + * this consensus. So, tryTerminate sweeps through queues (until + * stable) to ensure lack of in-flight submissions and workers + * about to process them before triggering the "STOP" phase of + * termination. (Note: there is an intrinsic conflict if + * helpQuiescePool is called when shutdown is enabled. Both wait + * for quiescence, but tryTerminate is biased to not trigger until + * helpQuiescePool completes.) + * * * Joining Tasks * ============= @@ -403,9 +531,9 @@ * just let them block (as in Thread.join). We also cannot just * reassign the joiner's run-time stack with another and replace * it later, which would be a form of "continuation", that even if - * possible is not necessarily a good idea since we sometimes need - * both an unblocked task and its continuation to progress. - * Instead we combine two tactics: + * possible is not necessarily a good idea since we may need both + * an unblocked task and its continuation to progress. Instead we + * combine two tactics: * * Helping: Arranging for the joiner to execute some task that it * would be running if the steal had not occurred. @@ -425,16 +553,16 @@ * The ManagedBlocker extension API can't use helping so relies * only on compensation in method awaitBlocker. * - * The algorithm in tryHelpStealer entails a form of "linear" - * helping: Each worker records (in field currentSteal) the most - * recent task it stole from some other worker. Plus, it records - * (in field currentJoin) the task it is currently actively - * joining. Method tryHelpStealer uses these markers to try to - * find a worker to help (i.e., steal back a task from and execute - * it) that could hasten completion of the actively joined task. - * In essence, the joiner executes a task that would be on its own - * local deque had the to-be-joined task not been stolen. This may - * be seen as a conservative variant of the approach in Wagner & + * The algorithm in helpStealer entails a form of "linear + * helping". Each worker records (in field currentSteal) the most + * recent task it stole from some other worker (or a submission). + * It also records (in field currentJoin) the task it is currently + * actively joining. Method helpStealer uses these markers to try + * to find a worker to help (i.e., steal back a task from and + * execute it) that could hasten completion of the actively joined + * task. Thus, the joiner executes a task that would be on its + * own local deque had the to-be-joined task not been stolen. This + * is a conservative variant of the approach described in Wagner & * Calder "Leapfrogging: a portable technique for implementing * efficient futures" SIGPLAN Notices, 1993 * (http://portal.acm.org/citation.cfm?id=155354). It differs in @@ -452,37 +580,40 @@ * which means that we miss links in the chain during long-lived * tasks, GC stalls etc (which is OK since blocking in such cases * is usually a good idea). (4) We bound the number of attempts - * to find work (see MAX_HELP) and fall back to suspending the + * to find work using checksums and fall back to suspending the * worker and if necessary replacing it with another. * - * Helping actions for CountedCompleters are much simpler: Method - * helpComplete can take and execute any task with the same root - * as the task being waited on. However, this still entails some - * traversal of completer chains, so is less efficient than using - * CountedCompleters without explicit joins. + * Helping actions for CountedCompleters do not require tracking + * currentJoins: Method helpComplete takes and executes any task + * with the same root as the task being waited on (preferring + * local pops to non-local polls). However, this still entails + * some traversal of completer chains, so is less efficient than + * using CountedCompleters without explicit joins. * - * It is impossible to keep exactly the target parallelism number - * of threads running at any given time. Determining the - * existence of conservatively safe helping targets, the - * availability of already-created spares, and the apparent need - * to create new spares are all racy, so we rely on multiple - * retries of each. Compensation in the apparent absence of - * helping opportunities is challenging to control on JVMs, where - * GC and other activities can stall progress of tasks that in - * turn stall out many other dependent tasks, without us being - * able to determine whether they will ever require compensation. - * Even though work-stealing otherwise encounters little - * degradation in the presence of more threads than cores, - * aggressively adding new threads in such cases entails risk of - * unwanted positive feedback control loops in which more threads - * cause more dependent stalls (as well as delayed progress of - * unblocked threads to the point that we know they are available) - * leading to more situations requiring more threads, and so - * on. This aspect of control can be seen as an (analytically - * intractable) game with an opponent that may choose the worst - * (for us) active thread to stall at any time. We take several - * precautions to bound losses (and thus bound gains), mainly in - * methods tryCompensate and awaitJoin. + * Compensation does not aim to keep exactly the target + * parallelism number of unblocked threads running at any given + * time. Some previous versions of this class employed immediate + * compensations for any blocked join. However, in practice, the + * vast majority of blockages are transient byproducts of GC and + * other JVM or OS activities that are made worse by replacement. + * Currently, compensation is attempted only after validating that + * all purportedly active threads are processing tasks by checking + * field WorkQueue.scanState, which eliminates most false + * positives. Also, compensation is bypassed (tolerating fewer + * threads) in the most common case in which it is rarely + * beneficial: when a worker with an empty queue (thus no + * continuation tasks) blocks on a join and there still remain + * enough threads to ensure liveness. + * + * The compensation mechanism may be bounded. Bounds for the + * commonPool (see commonMaxSpares) better enable JVMs to cope + * with programming errors and abuse before running out of + * resources to do so. In other cases, users may supply factories + * that limit thread construction. The effects of bounding in this + * pool (like all others) is imprecise. Total worker counts are + * decremented when threads deregister, not when they exit and + * resources are reclaimed by the JVM and OS. So the number of + * simultaneously live threads may transiently exceed bounds. * * Common Pool * =========== @@ -492,34 +623,52 @@ * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields, with no nested * allocation. Most bootstrapping occurs within method - * fullExternalPush during the first submission to the pool. + * externalSubmit during the first submission to the pool. * * When external threads submit to the common pool, they can - * perform subtask processing (see externalHelpJoin and related - * methods). This caller-helps policy makes it sensible to set - * common pool parallelism level to one (or more) less than the - * total number of available cores, or even zero for pure - * caller-runs. We do not need to record whether external - * submissions are to the common pool -- if not, externalHelpJoin - * returns quickly (at the most helping to signal some common pool - * workers). These submitters would otherwise be blocked waiting - * for completion, so the extra effort (with liberally sprinkled - * task status checks) in inapplicable cases amounts to an odd - * form of limited spin-wait before blocking in ForkJoinTask.join. + * perform subtask processing (see externalHelpComplete and + * related methods) upon joins. This caller-helps policy makes it + * sensible to set common pool parallelism level to one (or more) + * less than the total number of available cores, or even zero for + * pure caller-runs. We do not need to record whether external + * submissions are to the common pool -- if not, external help + * methods return quickly. These submitters would otherwise be + * blocked waiting for completion, so the extra effort (with + * liberally sprinkled task status checks) in inapplicable cases + * amounts to an odd form of limited spin-wait before blocking in + * ForkJoinTask.join. * * As a more appropriate default in managed environments, unless * overridden by system properties, we use workers of subclass * InnocuousForkJoinWorkerThread when there is a SecurityManager * present. These workers have no permissions set, do not belong * to any user-defined ThreadGroup, and erase all ThreadLocals - * after executing any top-level task (see WorkQueue.runTask). The - * associated mechanics (mainly in ForkJoinWorkerThread) may be - * JVM-dependent and must access particular Thread class fields to - * achieve this effect. + * after executing any top-level task (see WorkQueue.runTask). + * The associated mechanics (mainly in ForkJoinWorkerThread) may + * be JVM-dependent and must access particular Thread class fields + * to achieve this effect. * * Style notes * =========== * + * Memory ordering relies mainly on Unsafe intrinsics that carry + * the further responsibility of explicitly performing null- and + * bounds- checks otherwise carried out implicitly by JVMs. This + * can be awkward and ugly, but also reflects the need to control + * outcomes across the unusual cases that arise in very racy code + * with very few invariants. So these explicit checks would exist + * in some form anyway. All fields are read into locals before + * use, and null-checked if they are references. This is usually + * done in a "C"-like style of listing declarations at the heads + * of methods or blocks, and using inline assignments on first + * encounter. Array bounds-checks are usually performed by + * masking with array.length-1, which relies on the invariant that + * these arrays are created with positive lengths, which is itself + * paranoically checked. Nearly all explicit checks lead to + * bypass/return, not exception throws, because they may + * legitimately arise due to cancellation/revocation during + * shutdown. + * * There is a lot of representation-level coupling among classes * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The * fields of WorkQueue maintain data structures managed by @@ -527,22 +676,13 @@ * trying to reduce this, since any associated future changes in * representations will need to be accompanied by algorithmic * changes anyway. Several methods intrinsically sprawl because - * they must accumulate sets of consistent reads of volatiles held - * in local variables. Methods signalWork() and scan() are the - * main bottlenecks, so are especially heavily - * micro-optimized/mangled. There are lots of inline assignments - * (of form "while ((local = field) != 0)") which are usually the - * simplest way to ensure the required read orderings (which are - * sometimes critical). This leads to a "C"-like style of listing - * declarations of these locals at the heads of methods or blocks. - * There are several occurrences of the unusual "do {} while - * (!cas...)" which is the simplest way to force an update of a - * CAS'ed variable. There are also other coding oddities (including - * several unnecessary-looking hoisted null checks) that help - * some methods perform reasonably even when interpreted (not - * compiled). + * they must accumulate sets of consistent reads of fields held in + * local variables. There are also other coding oddities + * (including several unnecessary-looking hoisted null checks) + * that help some methods perform reasonably even when interpreted + * (not compiled). * - * The order of declarations in this file is: + * The order of declarations in this file is (with a few exceptions): * (1) Static utility functions * (2) Nested (static) classes * (3) Static fields @@ -609,56 +749,37 @@ public final boolean exec() { return true; } } + // Constants shared across ForkJoinPool and WorkQueue + + // Bounds + static final int SMASK = 0xffff; // short bits == max index + static final int MAX_CAP = 0x7fff; // max #workers - 1 + static final int EVENMASK = 0xfffe; // even short bits + static final int SQMASK = 0x007e; // max 64 (even) slots + + // Masks and units for WorkQueue.scanState and ctl sp subfield + static final int SCANNING = 1; // false when running tasks + static final int INACTIVE = 1 << 31; // must be negative + static final int SS_SEQ = 1 << 16; // version count + + // Mode bits for ForkJoinPool.config and WorkQueue.config + static final int MODE_MASK = 0xffff << 16; // top half of int + static final int LIFO_QUEUE = 0; + static final int FIFO_QUEUE = 1 << 16; + static final int SHARED_QUEUE = 1 << 31; // must be negative + /** * Queues supporting work-stealing as well as external task - * submission. See above for main rationale and algorithms. - * Implementation relies heavily on "Unsafe" intrinsics - * and selective use of "volatile": - * - * Field "base" is the index (mod array.length) of the least valid - * queue slot, which is always the next position to steal (poll) - * from if nonempty. Reads and writes require volatile orderings - * but not CAS, because updates are only performed after slot - * CASes. - * - * Field "top" is the index (mod array.length) of the next queue - * slot to push to or pop from. It is written only by owner thread - * for push, or under lock for external/shared push, and accessed - * by other threads only after reading (volatile) base. Both top - * and base are allowed to wrap around on overflow, but (top - - * base) (or more commonly -(base - top) to force volatile read of - * base before top) still estimates size. The lock ("qlock") is - * forced to -1 on termination, causing all further lock attempts - * to fail. (Note: we don't need CAS for termination state because - * upon pool shutdown, all shared-queues will stop being used - * anyway.) Nearly all lock bodies are set up so that exceptions - * within lock bodies are "impossible" (modulo JVM errors that - * would cause failure anyway.) - * - * The array slots are read and written using the emulation of - * volatiles/atomics provided by Unsafe. Insertions must in - * general use putOrderedObject as a form of releasing store to - * ensure that all writes to the task object are ordered before - * its publication in the queue. All removals entail a CAS to - * null. The array is always a power of two. To ensure safety of - * Unsafe array operations, all accesses perform explicit null - * checks and implicit bounds checks via power-of-two masking. - * - * In addition to basic queuing support, this class contains - * fields described elsewhere to control execution. It turns out - * to work better memory-layout-wise to include them in this class - * rather than a separate class. - * + * submission. See above for descriptions and algorithms. * Performance on most platforms is very sensitive to placement of * instances of both WorkQueues and their arrays -- we absolutely * do not want multiple WorkQueue instances or multiple queue - * arrays sharing cache lines. (It would be best for queue objects - * and their arrays to share, but there is nothing available to - * help arrange that). The @Contended annotation alerts JVMs to - * try to keep instances apart. + * arrays sharing cache lines. The @Contended annotation alerts + * JVMs to try to keep instances apart. */ @sun.misc.Contended static final class WorkQueue { + /** * Capacity of work-stealing queue array upon initialization. * Must be a power of two; at least 4, but should be larger to @@ -679,13 +800,13 @@ */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M - volatile int eventCount; // encoded inactivation count; < 0 if inactive - int nextWait; // encoded record of next event waiter + // Instance fields + volatile int scanState; // versioned, <0: inactive; odd:scanning + int stackPred; // pool stack (ctl) predecessor int nsteals; // number of steals - int hint; // steal index hint - short poolIndex; // index of this queue in pool - final short mode; // 0: lifo, > 0: fifo, < 0: shared - volatile int qlock; // 1: locked, -1: terminate; else 0 + int hint; // randomization and stealer index hint + int config; // pool index and mode + volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask[] array; // the elements (initially unallocated) @@ -693,19 +814,23 @@ final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null volatile ForkJoinTask currentJoin; // task being joined in awaitJoin - ForkJoinTask currentSteal; // current non-local task being executed + volatile ForkJoinTask currentSteal; // mainly used by helpStealer - WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, - int seed) { + WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; - this.mode = (short)mode; - this.hint = seed; // store initial seed for runWorker // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } /** + * Returns an exportable index (used by ForkJoinWorkerThread). + */ + final int getPoolIndex() { + return (config & 0xffff) >>> 1; // ignore odd/even tag bit + } + + /** * Returns the approximate number of tasks in the queue. */ final int queueSize() { @@ -719,12 +844,10 @@ * near-empty queue has at least one unclaimed task. */ final boolean isEmpty() { - ForkJoinTask[] a; int m, s; - int n = base - (s = top); - return (n >= 0 || - (n == -1 && - ((a = array) == null || - (m = a.length - 1) < 0 || + ForkJoinTask[] a; int n, m, s; + return ((n = base - (s = top)) >= 0 || + (n == -1 && // possibly one task + ((a = array) == null || (m = a.length - 1) < 0 || U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } @@ -738,12 +861,15 @@ */ final void push(ForkJoinTask task) { ForkJoinTask[] a; ForkJoinPool p; - int s = top, n; + int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed - int m = a.length - 1; + int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); - if ((n = (top = s + 1) - base) <= 2) - (p = pool).signalWork(p.workQueues, this); + U.putOrderedInt(this, QTOP, s + 1); + if ((n = s - b) <= 1) { + if ((p = pool) != null) + p.signalWork(p.workQueues, this); + } else if (n >= m) growArray(); } @@ -764,7 +890,7 @@ if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; - do { + do { // emulate poll from old array, push to new array ForkJoinTask x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; @@ -789,7 +915,7 @@ if ((t = (ForkJoinTask)U.getObject(a, j)) == null) break; if (U.compareAndSwapObject(a, j, t, null)) { - top = s; + U.putOrderedInt(this, QTOP, s); return t; } } @@ -800,7 +926,7 @@ /** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions - * appear in ForkJoinPool methods scan and tryHelpStealer. + * appear in ForkJoinPool methods scan and helpStealer. */ final ForkJoinTask pollAt(int b) { ForkJoinTask t; ForkJoinTask[] a; @@ -808,7 +934,7 @@ int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) != null && base == b && U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); + base = b + 1; return t; } } @@ -823,16 +949,15 @@ while ((b = base) - top < 0 && (a = array) != null) { int j = (((a.length - 1) & b) << ASHIFT) + ABASE; t = (ForkJoinTask)U.getObjectVolatile(a, j); - if (t != null) { - if (U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); - return t; + if (base == b) { + if (t != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + return t; + } } - } - else if (base == b) { - if (b + 1 == top) + else if (b + 1 == top) // now empty break; - Thread.yield(); // wait for lagging update (very rare) } } return null; @@ -842,7 +967,7 @@ * Takes next task, if one exists, in order specified by mode. */ final ForkJoinTask nextLocalTask() { - return mode == 0 ? pop() : poll(); + return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } /** @@ -852,7 +977,7 @@ ForkJoinTask[] a = array; int m; if (a == null || (m = a.length - 1) < 0) return null; - int i = mode == 0 ? top - 1 : base; + int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; return (ForkJoinTask)U.getObjectVolatile(a, j); } @@ -860,13 +985,13 @@ /** * Pops the given task only if it is at the current top. * (A shared version is available only via FJP.tryExternalUnpush) - */ + */ final boolean tryUnpush(ForkJoinTask t) { ForkJoinTask[] a; int s; if ((a = array) != null && (s = top) != base && U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { - top = s; + U.putOrderedInt(this, QTOP, s); return true; } return false; @@ -876,9 +1001,16 @@ * Removes and cancels all known tasks, ignoring any exceptions. */ final void cancelAll() { - ForkJoinTask.cancelIgnoringExceptions(currentJoin); - ForkJoinTask.cancelIgnoringExceptions(currentSteal); - for (ForkJoinTask t; (t = poll()) != null; ) + ForkJoinTask t; + if ((t = currentJoin) != null) { + currentJoin = null; + ForkJoinTask.cancelIgnoringExceptions(t); + } + if ((t = currentSteal) != null) { + currentSteal = null; + ForkJoinTask.cancelIgnoringExceptions(t); + } + while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } @@ -893,167 +1025,186 @@ } /** - * Executes a top-level task and any local tasks remaining - * after execution. + * Removes and executes all local tasks. If LIFO, invokes + * pollAndExecAll. Otherwise implements a specialized pop loop + * to exec until empty. + */ + final void execLocalTasks() { + int b = base, m, s; + ForkJoinTask[] a = array; + if (b - (s = top - 1) <= 0 && a != null && + (m = a.length - 1) >= 0) { + if ((config & FIFO_QUEUE) == 0) { + for (ForkJoinTask t;;) { + if ((t = (ForkJoinTask)U.getAndSetObject + (a, ((m & s) << ASHIFT) + ABASE, null)) == null) + break; + U.putOrderedInt(this, QTOP, s); + t.doExec(); + if (base - (s = top - 1) > 0) + break; + } + } + else + pollAndExecAll(); + } + } + + /** + * Executes the given task and any remaining local tasks. */ final void runTask(ForkJoinTask task) { - if ((currentSteal = task) != null) { - ForkJoinWorkerThread thread; - task.doExec(); - ForkJoinTask[] a = array; - int md = mode; - ++nsteals; - currentSteal = null; - if (md != 0) - pollAndExecAll(); - else if (a != null) { - int s, m = a.length - 1; - ForkJoinTask t; - while ((s = top - 1) - base >= 0 && - (t = (ForkJoinTask)U.getAndSetObject - (a, ((m & s) << ASHIFT) + ABASE, null)) != null) { - top = s; - t.doExec(); - } - } - if ((thread = owner) != null) // no need to do in finally clause + if (task != null) { + scanState &= ~SCANNING; // mark as busy + (currentSteal = task).doExec(); + U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC + execLocalTasks(); + ForkJoinWorkerThread thread = owner; + if (++nsteals < 0) // collect on overflow + transferStealCount(pool); + scanState |= SCANNING; + if (thread != null) thread.afterTopLevelExec(); } } /** + * Adds steal count to pool stealCounter if it exists, and resets. + */ + final void transferStealCount(ForkJoinPool p) { + AtomicLong sc; + if (p != null && (sc = p.stealCounter) != null) { + int s = nsteals; + nsteals = 0; // if negative, correct for overflow + sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); + } + } + + /** * If present, removes from queue and executes the given task, - * or any other cancelled task. Returns (true) on any CAS - * or consistency check failure so caller can retry. + * or any other cancelled task. Used only by awaitJoin. * - * @return false if no progress can be made, else true + * @return true if queue empty and task not known to be done */ final boolean tryRemoveAndExec(ForkJoinTask task) { - boolean stat; ForkJoinTask[] a; int m, s, b, n; - if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && - (n = (s = top) - (b = base)) > 0) { - boolean removed = false, empty = true; - stat = true; - for (ForkJoinTask t;;) { // traverse from s to b - long j = ((--s & m) << ASHIFT) + ABASE; - t = (ForkJoinTask)U.getObject(a, j); - if (t == null) // inconsistent length - break; - else if (t == task) { - if (s + 1 == top) { // pop - if (!U.compareAndSwapObject(a, j, task, null)) - break; - top = s; - removed = true; + if ((a = array) != null && (m = a.length - 1) >= 0 && + task != null) { + while ((n = (s = top) - (b = base)) > 0) { + for (ForkJoinTask t;;) { // traverse from s to b + long j = ((--s & m) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask)U.getObject(a, j)) == null) + return s + 1 == top; // shorter than expected + else if (t == task) { + boolean removed = false; + if (s + 1 == top) { // pop + if (U.compareAndSwapObject(a, j, task, null)) { + U.putOrderedInt(this, QTOP, s); + removed = true; + } + } + else if (base == b) // replace with proxy + removed = U.compareAndSwapObject( + a, j, task, new EmptyTask()); + if (removed) + task.doExec(); + break; } - else if (base == b) // replace with proxy - removed = U.compareAndSwapObject(a, j, task, - new EmptyTask()); - break; - } - else if (t.status >= 0) - empty = false; - else if (s + 1 == top) { // pop and throw away - if (U.compareAndSwapObject(a, j, t, null)) - top = s; - break; + else if (t.status < 0 && s + 1 == top) { + if (U.compareAndSwapObject(a, j, t, null)) + U.putOrderedInt(this, QTOP, s); + break; // was cancelled + } + if (--n == 0) + return false; } - if (--n == 0) { - if (!empty && base == b) - stat = false; - break; - } + if (task.status < 0) + return false; } - if (removed) - task.doExec(); } - else - stat = false; - return stat; + return true; } /** - * Tries to poll for and execute the given task or any other - * task in its CountedCompleter computation. + * Pops task if in the same CC computation as the given task, + * in either shared or owned mode. Used only by helpComplete. */ - final boolean pollAndExecCC(CountedCompleter root) { - ForkJoinTask[] a; int b; Object o; CountedCompleter t, r; - if ((b = base) - top < 0 && (a = array) != null) { - long j = (((a.length - 1) & b) << ASHIFT) + ABASE; - if ((o = U.getObjectVolatile(a, j)) == null) - return true; // retry - if (o instanceof CountedCompleter) { - for (t = (CountedCompleter)o, r = t;;) { - if (r == root) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); - t.doExec(); - } - return true; - } - else if ((r = r.completer) == null) - break; // not part of root computation - } - } - } - return false; - } - - /** - * Tries to pop and execute the given task or any other task - * in its CountedCompleter computation. - */ - final boolean externalPopAndExecCC(CountedCompleter root) { - ForkJoinTask[] a; int s; Object o; CountedCompleter t, r; + final CountedCompleter popCC(CountedCompleter task, int mode) { + int s; ForkJoinTask[] a; Object o; if (base - (s = top) < 0 && (a = array) != null) { long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; - if ((o = U.getObject(a, j)) instanceof CountedCompleter) { - for (t = (CountedCompleter)o, r = t;;) { - if (r == root) { - if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { - if (top == s && array == a && - U.compareAndSwapObject(a, j, t, null)) { - top = s - 1; - qlock = 0; - t.doExec(); + if ((o = U.getObjectVolatile(a, j)) != null && + (o instanceof CountedCompleter)) { + CountedCompleter t = (CountedCompleter)o; + for (CountedCompleter r = t;;) { + if (r == task) { + if (mode < 0) { // must lock + if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { + if (top == s && array == a && + U.compareAndSwapObject(a, j, t, null)) { + U.putOrderedInt(this, QTOP, s - 1); + U.putOrderedInt(this, QLOCK, 0); + return t; + } + U.compareAndSwapInt(this, QLOCK, 1, 0); } - else - qlock = 0; } - return true; + else if (U.compareAndSwapObject(a, j, t, null)) { + U.putOrderedInt(this, QTOP, s - 1); + return t; + } + break; } - else if ((r = r.completer) == null) + else if ((r = r.completer) == null) // try parent break; } } } - return false; + return null; } /** - * Internal version + * Steals and runs a task in the same CC computation as the + * given task if one exists and can be taken without + * contention. Otherwise returns a checksum/control value for + * use by method helpComplete. + * + * @return 1 if successful, 2 if retryable (lost to another + * stealer), -1 if non-empty but no matching task found, else + * the base index, forced negative. */ - final boolean internalPopAndExecCC(CountedCompleter root) { - ForkJoinTask[] a; int s; Object o; CountedCompleter t, r; - if (base - (s = top) < 0 && (a = array) != null) { - long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; - if ((o = U.getObject(a, j)) instanceof CountedCompleter) { - for (t = (CountedCompleter)o, r = t;;) { - if (r == root) { - if (U.compareAndSwapObject(a, j, t, null)) { - top = s - 1; + final int pollAndExecCC(CountedCompleter task) { + int b, h; ForkJoinTask[] a; Object o; + if ((b = base) - top >= 0 || (a = array) == null) + h = b | Integer.MIN_VALUE; // to sense movement on re-poll + else { + long j = (((a.length - 1) & b) << ASHIFT) + ABASE; + if ((o = U.getObjectVolatile(a, j)) == null) + h = 2; // retryable + else if (!(o instanceof CountedCompleter)) + h = -1; // unmatchable + else { + CountedCompleter t = (CountedCompleter)o; + for (CountedCompleter r = t;;) { + if (r == task) { + if (base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; t.doExec(); + h = 1; // success } - return true; + else + h = 2; // lost CAS + break; } - else if ((r = r.completer) == null) + else if ((r = r.completer) == null) { + h = -1; // unmatched break; + } } } } - return false; + return h; } /** @@ -1061,28 +1212,31 @@ */ final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; - return (eventCount >= 0 && + return (scanState >= 0 && (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } - // Unsafe mechanics + // Unsafe mechanics. Note that some are (and must be) the same as in FJP private static final sun.misc.Unsafe U; - private static final long QBASE; + private static final int ABASE; + private static final int ASHIFT; + private static final long QTOP; private static final long QLOCK; - private static final int ABASE; - private static final int ASHIFT; + private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); - Class k = WorkQueue.class; + Class wk = WorkQueue.class; Class ak = ForkJoinTask[].class; - QBASE = U.objectFieldOffset - (k.getDeclaredField("base")); + QTOP = U.objectFieldOffset + (wk.getDeclaredField("top")); QLOCK = U.objectFieldOffset - (k.getDeclaredField("qlock")); + (wk.getDeclaredField("qlock")); + QCURRENTSTEAL = U.objectFieldOffset + (wk.getDeclaredField("currentSteal")); ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) @@ -1126,6 +1280,11 @@ static final int commonParallelism; /** + * Limit on spare thread construction in tryCompensate. + */ + private static int commonMaxSpares; + + /** * Sequence number for creating workerNamePrefix. */ private static int poolNumberSequence; @@ -1138,7 +1297,7 @@ return ++poolNumberSequence; } - // static constants + // static configuration constants /** * Initial timeout value (in nanoseconds) for the thread @@ -1148,27 +1307,34 @@ * aggressive shrinkage during most transient stalls (long GCs * etc). */ - private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec - - /** - * Timeout value when there are more threads than parallelism level - */ - private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; + private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec /** * Tolerance for idle timeouts, to cope with timer undershoots */ - private static final long TIMEOUT_SLOP = 2000000L; + private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms /** - * The maximum stolen->joining link depth allowed in method - * tryHelpStealer. Must be a power of two. Depths for legitimate - * chains are unbounded, but we use a fixed constant to avoid - * (otherwise unchecked) cycles and to bound staleness of - * traversal parameters at the expense of sometimes blocking when - * we could be helping. + * The initial value for commonMaxSpares during static + * initialization unless overridden using System property + * "java.util.concurrent.ForkJoinPool.common.maximumSpares". The + * default value is far in excess of normal requirements, but also + * far short of MAX_CAP and typical OS thread limits, so allows + * JVMs to catch misuse/abuse before running out of resources + * needed to do so. */ - private static final int MAX_HELP = 64; + private static final int DEFAULT_COMMON_MAX_SPARES = 256; + + /** + * Number of times to spin-wait before blocking. The spins (in + * awaitRunStateLock and awaitWork) currently use randomized + * spins. If/when MWAIT-like intrinsics becomes available, they + * may allow quieter spinning. The value of SPINS must be a power + * of two, at least 4. The current value causes spinning for a + * small fraction of typical context-switch times, well worthwhile + * given the typical likelihoods that blocking is not necessary. + */ + private static final int SPINS = 1 << 11; /** * Increment for seed generators. See class ThreadLocal for @@ -1177,209 +1343,212 @@ private static final int SEED_INCREMENT = 0x9e3779b9; /* - * Bits and masks for control variables - * - * Field ctl is a long packed with: - * AC: Number of active running workers minus target parallelism (16 bits) - * TC: Number of total workers minus target parallelism (16 bits) - * ST: true if pool is terminating (1 bit) - * EC: the wait count of top waiting thread (15 bits) - * ID: poolIndex of top of Treiber stack of waiters (16 bits) + * Bits and masks for field ctl, packed with 4 16 bit subfields: + * AC: Number of active running workers minus target parallelism + * TC: Number of total workers minus target parallelism + * SS: version count and status of top waiting thread + * ID: poolIndex of top of Treiber stack of waiters * - * When convenient, we can extract the upper 32 bits of counts and - * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = - * (int)ctl. The ec field is never accessed alone, but always - * together with id and st. The offsets of counts by the target - * parallelism and the positionings of fields makes it possible to - * perform the most common checks via sign tests of fields: When - * ac is negative, there are not enough active workers, when tc is - * negative, there are not enough total workers, and when e is - * negative, the pool is terminating. To deal with these possibly - * negative fields, we use casts in and out of "short" and/or - * signed shifts to maintain signedness. + * When convenient, we can extract the lower 32 stack top bits + * (including version bits) as sp=(int)ctl. The offsets of counts + * by the target parallelism and the positionings of fields makes + * it possible to perform the most common checks via sign tests of + * fields: When ac is negative, there are not enough active + * workers, when tc is negative, there are not enough total + * workers. When sp is non-zero, there are waiting workers. To + * deal with possibly negative fields, we use casts in and out of + * "short" and/or signed shifts to maintain signedness. * - * When a thread is queued (inactivated), its eventCount field is - * set negative, which is the only way to tell if a worker is - * prevented from executing tasks, even though it must continue to - * scan for them to avoid queuing races. Note however that - * eventCount updates lag releases so usage requires care. - * - * Field plock is an int packed with: - * SHUTDOWN: true if shutdown is enabled (1 bit) - * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) - * SIGNAL: set when threads may be waiting on the lock (1 bit) - * - * The sequence number enables simple consistency checks: - * Staleness of read-only operations on the workQueues array can - * be checked by comparing plock before vs after the reads. + * Because it occupies uppermost bits, we can add one active count + * using getAndAddLong of AC_UNIT, rather than CAS, when returning + * from a blocked join. Other updates entail multiple subfields + * and masking, requiring CAS. */ - // bit positions/shifts for fields - private static final int AC_SHIFT = 48; - private static final int TC_SHIFT = 32; - private static final int ST_SHIFT = 31; - private static final int EC_SHIFT = 16; + // Lower and upper word masks + private static final long SP_MASK = 0xffffffffL; + private static final long UC_MASK = ~SP_MASK; - // bounds - private static final int SMASK = 0xffff; // short bits - private static final int MAX_CAP = 0x7fff; // max #workers - 1 - private static final int EVENMASK = 0xfffe; // even short bits - private static final int SQMASK = 0x007e; // max 64 (even) slots - private static final int SHORT_SIGN = 1 << 15; - private static final int INT_SIGN = 1 << 31; - - // masks - private static final long STOP_BIT = 0x0001L << ST_SHIFT; - private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; - private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; - - // units for incrementing and decrementing - private static final long TC_UNIT = 1L << TC_SHIFT; - private static final long AC_UNIT = 1L << AC_SHIFT; + // Active counts + private static final int AC_SHIFT = 48; + private static final long AC_UNIT = 0x0001L << AC_SHIFT; + private static final long AC_MASK = 0xffffL << AC_SHIFT; - // masks and units for dealing with u = (int)(ctl >>> 32) - private static final int UAC_SHIFT = AC_SHIFT - 32; - private static final int UTC_SHIFT = TC_SHIFT - 32; - private static final int UAC_MASK = SMASK << UAC_SHIFT; - private static final int UTC_MASK = SMASK << UTC_SHIFT; - private static final int UAC_UNIT = 1 << UAC_SHIFT; - private static final int UTC_UNIT = 1 << UTC_SHIFT; + // Total counts + private static final int TC_SHIFT = 32; + private static final long TC_UNIT = 0x0001L << TC_SHIFT; + private static final long TC_MASK = 0xffffL << TC_SHIFT; + private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign - // masks and units for dealing with e = (int)ctl - private static final int E_MASK = 0x7fffffff; // no STOP_BIT - private static final int E_SEQ = 1 << EC_SHIFT; - - // plock bits - private static final int SHUTDOWN = 1 << 31; - private static final int PL_LOCK = 2; - private static final int PL_SIGNAL = 1; - private static final int PL_SPINS = 1 << 8; - - // access mode for WorkQueue - static final int LIFO_QUEUE = 0; - static final int FIFO_QUEUE = 1; - static final int SHARED_QUEUE = -1; + // runState bits: SHUTDOWN must be negative, others arbitrary powers of two + private static final int RSLOCK = 1; + private static final int RSIGNAL = 1 << 1; + private static final int STARTED = 1 << 2; + private static final int STOP = 1 << 29; + private static final int TERMINATED = 1 << 30; + private static final int SHUTDOWN = 1 << 31; // Instance fields - volatile long stealCount; // collects worker counts - volatile long ctl; // main pool control - volatile int plock; // shutdown status and seqLock - volatile int indexSeed; // worker/submitter index seed - final short parallelism; // parallelism level - final short mode; // LIFO/FIFO - WorkQueue[] workQueues; // main registry + volatile long ctl; // main pool control + volatile int runState; // lockable status + final int config; // parallelism, mode + int indexSeed; // to generate worker index + volatile WorkQueue[] workQueues; // main registry final ForkJoinWorkerThreadFactory factory; - final UncaughtExceptionHandler ueh; // per-worker UEH - final String workerNamePrefix; // to create worker name string + final UncaughtExceptionHandler ueh; // per-worker UEH + final String workerNamePrefix; // to create worker name string + volatile AtomicLong stealCounter; // also used as sync monitor + + /** + * Acquires the runState lock; returns current (locked) runState. + */ + private int lockRunState() { + int rs; + return ((((rs = runState) & RSLOCK) != 0 || + !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? + awaitRunStateLock() : rs); + } /** - * Acquires the plock lock to protect worker array and related - * updates. This method is called only if an initial CAS on plock - * fails. This acts as a spinlock for normal cases, but falls back - * to builtin monitor to block when (rarely) needed. This would be - * a terrible idea for a highly contended lock, but works fine as - * a more conservative alternative to a pure spinlock. + * Spins and/or blocks until runstate lock is available. See + * above for explanation. */ - private int acquirePlock() { - int spins = PL_SPINS, ps, nps; - for (;;) { - if (((ps = plock) & PL_LOCK) == 0 && - U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) - return nps; - else if (spins >= 0) { - if (ThreadLocalRandom.nextSecondarySeed() >= 0) + private int awaitRunStateLock() { + Object lock; + boolean wasInterrupted = false; + for (int spins = SPINS, r = 0, rs, ns;;) { + if (((rs = runState) & RSLOCK) == 0) { + if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { + if (wasInterrupted) { + try { + Thread.currentThread().interrupt(); + } catch (SecurityException ignore) { + } + } + return ns; + } + } + else if (r == 0) + r = ThreadLocalRandom.nextSecondarySeed(); + else if (spins > 0) { + r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift + if (r >= 0) --spins; } - else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { - synchronized (this) { - if ((plock & PL_SIGNAL) != 0) { + else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) + Thread.yield(); // initialization race + else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { + synchronized (lock) { + if ((runState & RSIGNAL) != 0) { try { - wait(); + lock.wait(); } catch (InterruptedException ie) { - try { - Thread.currentThread().interrupt(); - } catch (SecurityException ignore) { - } + if (!(Thread.currentThread() instanceof + ForkJoinWorkerThread)) + wasInterrupted = true; } } else - notifyAll(); + lock.notifyAll(); } } } } /** - * Unlocks and signals any thread waiting for plock. Called only - * when CAS of seq value for unlock fails. + * Unlocks and sets runState to newRunState. + * + * @param oldRunState a value returned from lockRunState + * @param newRunState the next value (must have lock bit clear). */ - private void releasePlock(int ps) { - plock = ps; - synchronized (this) { notifyAll(); } + private void unlockRunState(int oldRunState, int newRunState) { + if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { + Object lock = stealCounter; + runState = newRunState; // clears RSIGNAL bit + if (lock != null) + synchronized (lock) { lock.notifyAll(); } + } + } + + // Creating, registering and deregistering workers + + /** + * Tries to construct and start one worker. Assumes that total + * count has already been incremented as a reservation. Invokes + * deregisterWorker on any failure. + * + * @return true if successful + */ + private boolean createWorker() { + ForkJoinWorkerThreadFactory fac = factory; + Throwable ex = null; + ForkJoinWorkerThread wt = null; + try { + if (fac != null && (wt = fac.newThread(this)) != null) { + wt.start(); + return true; + } + } catch (Throwable rex) { + ex = rex; + } + deregisterWorker(wt, ex); + return false; } /** - * Tries to create and start one worker if fewer than target - * parallelism level exist. Adjusts counts etc on failure. + * Tries to add one worker, incrementing ctl counts before doing + * so, relying on createWorker to back out on failure. + * + * @param c incoming ctl value, with total count negative and no + * idle workers. On CAS failure, c is refreshed and retried if + * this holds (otherwise, a new worker is not needed). */ - private void tryAddWorker() { - long c; int u, e; - while ((u = (int)((c = ctl) >>> 32)) < 0 && - (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { - long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | - ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; - if (U.compareAndSwapLong(this, CTL, c, nc)) { - ForkJoinWorkerThreadFactory fac; - Throwable ex = null; - ForkJoinWorkerThread wt = null; - try { - if ((fac = factory) != null && - (wt = fac.newThread(this)) != null) { - wt.start(); - break; - } - } catch (Throwable rex) { - ex = rex; + private void tryAddWorker(long c) { + boolean add = false; + do { + long nc = ((AC_MASK & (c + AC_UNIT)) | + (TC_MASK & (c + TC_UNIT))); + if (ctl == c) { + int rs, stop; // check if terminating + if ((stop = (rs = lockRunState()) & STOP) == 0) + add = U.compareAndSwapLong(this, CTL, c, nc); + unlockRunState(rs, rs & ~RSLOCK); + if (stop != 0) + break; + if (add) { + createWorker(); + break; } - deregisterWorker(wt, ex); - break; } - } + } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } - // Registering and deregistering workers - /** - * Callback from ForkJoinWorkerThread to establish and record its - * WorkQueue. To avoid scanning bias due to packing entries in - * front of the workQueues array, we treat the array as a simple - * power-of-two hash table using per-thread seed as hash, - * expanding as needed. + * Callback from ForkJoinWorkerThread constructor to establish and + * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { - UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; - wt.setDaemon(true); + UncaughtExceptionHandler handler; + wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); - do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, - s += SEED_INCREMENT) || - s == 0); // skip 0 - WorkQueue w = new WorkQueue(this, wt, mode, s); - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); + WorkQueue w = new WorkQueue(this, wt); + int i = 0; // assign a pool index + int mode = config & MODE_MASK; + int rs = lockRunState(); try { - if ((ws = workQueues) != null) { // skip if shutting down - int n = ws.length, m = n - 1; - int r = (s << 1) | 1; // use odd-numbered indices - if (ws[r &= m] != null) { // collision - int probes = 0; // step by approx half size + WorkQueue[] ws; int n; // skip if no array + if ((ws = workQueues) != null && (n = ws.length) > 0) { + int s = indexSeed += SEED_INCREMENT; // unlikely to collide + int m = n - 1; + i = ((s << 1) | 1) & m; // odd-numbered indices + if (ws[i] != null) { // collision + int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; - while (ws[r = (r + step) & m] != null) { + while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; @@ -1387,15 +1556,15 @@ } } } - w.poolIndex = (short)r; - w.eventCount = r; // volatile write orders - ws[r] = w; + w.hint = s; // use as random seed + w.config = i | mode; + w.scanState = i; // publication fence + ws[i] = w; } } finally { - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); + unlockRunState(rs, rs & ~RSLOCK); } - wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); + wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; } @@ -1411,384 +1580,322 @@ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { - int ps; - w.qlock = -1; // ensure set - U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - try { - int idx = w.poolIndex; - WorkQueue[] ws = workQueues; - if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - } finally { - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - } + WorkQueue[] ws; // remove index from array + int idx = w.config & SMASK; + int rs = lockRunState(); + if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) + ws[idx] = null; + unlockRunState(rs, rs & ~RSLOCK); } - - long c; // adjust ctl counts + long c; // decrement counts do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | - ((c - TC_UNIT) & TC_MASK) | - (c & ~(AC_MASK|TC_MASK))))); - - if (!tryTerminate(false, false) && w != null && w.array != null) { - w.cancelAll(); // cancel remaining tasks - WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; - while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { - if (e > 0) { // activate or create replacement - if ((ws = workQueues) == null || - (i = e & SMASK) >= ws.length || - (v = ws[i]) == null) - break; - long nc = (((long)(v.nextWait & E_MASK)) | - ((long)(u + UAC_UNIT) << 32)); - if (v.eventCount != (e | INT_SIGN)) - break; - if (U.compareAndSwapLong(this, CTL, c, nc)) { - v.eventCount = (e + E_SEQ) & E_MASK; - if ((p = v.parker) != null) - U.unpark(p); - break; - } - } - else { - if ((short)u < 0) - tryAddWorker(); + (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | + (TC_MASK & (c - TC_UNIT)) | + (SP_MASK & c)))); + if (w != null) { + w.qlock = -1; // ensure set + w.transferStealCount(this); + w.cancelAll(); // cancel remaining tasks + } + for (;;) { // possibly replace + WorkQueue[] ws; int m, sp; + if (tryTerminate(false, false) || w == null || w.array == null || + (runState & STOP) != 0 || (ws = workQueues) == null || + (m = ws.length - 1) < 0) // already terminating + break; + if ((sp = (int)(c = ctl)) != 0) { // wake up replacement + if (tryRelease(c, ws[sp & m], AC_UNIT)) break; - } } + else if (ex != null && (c & ADD_WORKER) != 0L) { + tryAddWorker(c); // create replacement + break; + } + else // don't need replacement + break; } - if (ex == null) // help clean refs on way out + if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); - else // rethrow + else // rethrow ForkJoinTask.rethrow(ex); } - // Submissions - - /** - * Unless shutting down, adds the given task to a submission queue - * at submitter's current queue index (modulo submission - * range). Only the most common path is directly handled in this - * method. All others are relayed to fullExternalPush. - * - * @param task the task. Caller must ensure non-null. - */ - final void externalPush(ForkJoinTask task) { - WorkQueue q; int m, s, n, am; ForkJoinTask[] a; - int r = ThreadLocalRandom.getProbe(); - int ps = plock; - WorkQueue[] ws = workQueues; - if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && - (q = ws[m & r & SQMASK]) != null && r != 0 && - U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock - if ((a = q.array) != null && - (am = a.length - 1) > (n = (s = q.top) - q.base)) { - int j = ((am & s) << ASHIFT) + ABASE; - U.putOrderedObject(a, j, task); - q.top = s + 1; // push on to deque - q.qlock = 0; - if (n <= 1) - signalWork(ws, q); - return; - } - q.qlock = 0; - } - fullExternalPush(task); - } - - /** - * Full version of externalPush. This method is called, among - * other times, upon the first submission of the first task to the - * pool, so must perform secondary initialization. It also - * detects first submission by an external thread by looking up - * its ThreadLocal, and creates a new shared queue if the one at - * index if empty or contended. The plock lock body must be - * exception-free (so no try/finally) so we optimistically - * allocate new queues outside the lock and throw them away if - * (very rarely) not needed. - * - * Secondary initialization occurs when plock is zero, to create - * workQueue array and set plock to a valid value. This lock body - * must also be exception-free. Because the plock seq value can - * eventually wrap around zero, this method harmlessly fails to - * reinitialize if workQueues exists, while still advancing plock. - */ - private void fullExternalPush(ForkJoinTask task) { - int r; - if ((r = ThreadLocalRandom.getProbe()) == 0) { - ThreadLocalRandom.localInit(); - r = ThreadLocalRandom.getProbe(); - } - for (;;) { - WorkQueue[] ws; WorkQueue q; int ps, m, k; - boolean move = false; - if ((ps = plock) < 0) - throw new RejectedExecutionException(); - else if (ps == 0 || (ws = workQueues) == null || - (m = ws.length - 1) < 0) { // initialize workQueues - int p = parallelism; // find power of two table size - int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; - n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; - WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? - new WorkQueue[n] : null); - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if (((ws = workQueues) == null || ws.length == 0) && nws != null) - workQueues = nws; - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - } - else if ((q = ws[k = r & m & SQMASK]) != null) { - if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { - ForkJoinTask[] a = q.array; - int s = q.top; - boolean submitted = false; - try { // locked version of push - if ((a != null && a.length > s + 1 - q.base) || - (a = q.growArray()) != null) { // must presize - int j = (((a.length - 1) & s) << ASHIFT) + ABASE; - U.putOrderedObject(a, j, task); - q.top = s + 1; - submitted = true; - } - } finally { - q.qlock = 0; // unlock - } - if (submitted) { - signalWork(ws, q); - return; - } - } - move = true; // move on failure - } - else if (((ps = plock) & PL_LOCK) == 0) { // create new queue - q = new WorkQueue(this, null, SHARED_QUEUE, r); - q.poolIndex = (short)k; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if ((ws = workQueues) != null && k < ws.length && ws[k] == null) - ws[k] = q; - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - } - else - move = true; // move if busy - if (move) - r = ThreadLocalRandom.advanceProbe(r); - } - } - - // Maintaining ctl counts - - /** - * Increments active count; mainly called upon return from blocking. - */ - final void incrementActiveCount() { - long c; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, ((c & ~AC_MASK) | - ((c & AC_MASK) + AC_UNIT)))); - } + // Signalling /** * Tries to create or activate a worker if too few are active. * * @param ws the worker array to use to find signallees - * @param q if non-null, the queue holding tasks to be processed + * @param q a WorkQueue --if non-null, don't retry if now empty */ final void signalWork(WorkQueue[] ws, WorkQueue q) { - for (;;) { - long c; int e, u, i; WorkQueue w; Thread p; - if ((u = (int)((c = ctl) >>> 32)) >= 0) - break; - if ((e = (int)c) <= 0) { - if ((short)u < 0) - tryAddWorker(); + long c; int sp, i; WorkQueue v; Thread p; + while ((c = ctl) < 0L) { // too few active + if ((sp = (int)c) == 0) { // no idle workers + if ((c & ADD_WORKER) != 0L) // too few workers + tryAddWorker(c); break; } - if (ws == null || ws.length <= (i = e & SMASK) || - (w = ws[i]) == null) + if (ws == null) // unstarted/terminated + break; + if (ws.length <= (i = sp & SMASK)) // terminated + break; + if ((v = ws[i]) == null) // terminating break; - long nc = (((long)(w.nextWait & E_MASK)) | - ((long)(u + UAC_UNIT)) << 32); - int ne = (e + E_SEQ) & E_MASK; - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = ne; - if ((p = w.parker) != null) + int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState + int d = sp - v.scanState; // screen CAS + long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); + if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { + v.scanState = vs; // activate v + if ((p = v.parker) != null) U.unpark(p); break; } - if (q != null && q.base >= q.top) + if (q != null && q.base == q.top) // no more work break; } } + /** + * Signals and releases worker v if it is top of idle worker + * stack. This performs a one-shot version of signalWork only if + * there is (apparently) at least one idle worker. + * + * @param c incoming ctl value + * @param v if non-null, a worker + * @param inc the increment to active count (zero when compensating) + * @return true if successful + */ + private boolean tryRelease(long c, WorkQueue v, long inc) { + int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; + if (v != null && v.scanState == sp) { // v is at top of stack + long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred); + if (U.compareAndSwapLong(this, CTL, c, nc)) { + v.scanState = vs; + if ((p = v.parker) != null) + U.unpark(p); + return true; + } + } + return false; + } + // Scanning for tasks /** * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ final void runWorker(WorkQueue w) { - w.growArray(); // allocate queue - for (int r = w.hint; scan(w, r) == 0; ) { + w.growArray(); // allocate queue + int seed = w.hint; // initially holds randomization hint + int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift + for (ForkJoinTask t;;) { + if ((t = scan(w, r)) != null) + w.runTask(t); + else if (!awaitWork(w, r)) + break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } } /** - * Scans for and, if found, runs one task, else possibly - * inactivates the worker. This method operates on single reads of - * volatile state and is designed to be re-invoked continuously, - * in part because it returns upon detecting inconsistencies, - * contention, or state changes that indicate possible success on - * re-invocation. - * - * The scan searches for tasks across queues starting at a random - * index, checking each at least twice. The scan terminates upon - * either finding a non-empty queue, or completing the sweep. If - * the worker is not inactivated, it takes and runs a task from - * this queue. Otherwise, if not activated, it tries to activate - * itself or some other worker by signalling. On failure to find a - * task, returns (for retry) if pool state may have changed during - * an empty scan, or tries to inactivate if active, else possibly - * blocks or terminates via method awaitWork. + * Scans for and tries to steal a top-level task. Scans start at a + * random location, randomly moving on apparent contention, + * otherwise continuing linearly until reaching two consecutive + * empty passes over all queues with the same checksum (summing + * each base index of each queue, that moves on each steal), at + * which point the worker tries to inactivate and then re-scans, + * attempting to re-activate (itself or some other worker) if + * finding a task; otherwise returning null to await work. Scans + * otherwise touch as little memory as possible, to reduce + * disruption on other scanning threads. * * @param w the worker (via its WorkQueue) * @param r a random seed - * @return worker qlock status if would have waited, else 0 + * @return a task, or null if none found */ - private final int scan(WorkQueue w, int r) { + private ForkJoinTask scan(WorkQueue w, int r) { WorkQueue[] ws; int m; - long c = ctl; // for consistency check - if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { - for (int j = m + m + 1, ec = w.eventCount;;) { - WorkQueue q; int b, e; ForkJoinTask[] a; ForkJoinTask t; - if ((q = ws[(r - j) & m]) != null && - (b = q.base) - q.top < 0 && (a = q.array) != null) { - long i = (((a.length - 1) & b) << ASHIFT) + ABASE; - if ((t = ((ForkJoinTask) - U.getObjectVolatile(a, i))) != null) { - if (ec < 0) - helpRelease(c, ws, w, q, b); - else if (q.base == b && - U.compareAndSwapObject(a, i, t, null)) { - U.putOrderedInt(q, QBASE, b + 1); - if ((b + 1) - q.top < 0) - signalWork(ws, q); - w.runTask(t); + if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { + int ss = w.scanState; // initially non-negative + for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { + WorkQueue q; ForkJoinTask[] a; ForkJoinTask t; + int b, n; long c; + if ((q = ws[k]) != null) { + if ((n = (b = q.base) - q.top) < 0 && + (a = q.array) != null) { // non-empty + long i = (((a.length - 1) & b) << ASHIFT) + ABASE; + if ((t = ((ForkJoinTask) + U.getObjectVolatile(a, i))) != null && + q.base == b) { + if (ss >= 0) { + if (U.compareAndSwapObject(a, i, t, null)) { + q.base = b + 1; + if (n < -1) // signal others + signalWork(ws, q); + return t; + } + } + else if (oldSum == 0 && // try to activate + w.scanState < 0) + tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } + if (ss < 0) // refresh + ss = w.scanState; + r ^= r << 1; r ^= r >>> 3; r ^= r << 10; + origin = k = r & m; // move and rescan + oldSum = checkSum = 0; + continue; } - break; + checkSum += b; } - else if (--j < 0) { - if ((ec | (e = (int)c)) < 0) // inactive or terminating - return awaitWork(w, c, ec); - else if (ctl == c) { // try to inactivate and enqueue - long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); - w.nextWait = e; - w.eventCount = ec | INT_SIGN; - if (!U.compareAndSwapLong(this, CTL, c, nc)) - w.eventCount = ec; // back out + if ((k = (k + 1) & m) == origin) { // continue until stable + if ((ss >= 0 || (ss == (ss = w.scanState))) && + oldSum == (oldSum = checkSum)) { + if (ss < 0 || w.qlock < 0) // already inactive + break; + int ns = ss | INACTIVE; // try to inactivate + long nc = ((SP_MASK & ns) | + (UC_MASK & ((c = ctl) - AC_UNIT))); + w.stackPred = (int)c; // hold prev stack top + U.putInt(w, QSCANSTATE, ns); + if (U.compareAndSwapLong(this, CTL, c, nc)) + ss = ns; + else + w.scanState = ss; // back out } - break; + checkSum = 0; } } } - return 0; + return null; } /** - * A continuation of scan(), possibly blocking or terminating - * worker w. Returns without blocking if pool state has apparently - * changed since last invocation. Also, if inactivating w has - * caused the pool to become quiescent, checks for pool + * Possibly blocks worker w waiting for a task to steal, or + * returns false if the worker should terminate. If inactivating + * w has caused the pool to become quiescent, checks for pool * termination, and, so long as this is not the only worker, waits - * for event for up to a given duration. On timeout, if ctl has - * not changed, terminates the worker, which will in turn wake up + * for up to a given duration. On timeout, if ctl has not + * changed, terminates the worker, which will in turn wake up * another worker to possibly repeat this process. * * @param w the calling worker - * @param c the ctl value on entry to scan - * @param ec the worker's eventCount on entry to scan + * @param r a random seed (for spins) + * @return false if the worker should terminate */ - private final int awaitWork(WorkQueue w, long c, int ec) { - int stat, ns; long parkTime, deadline; - if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && - !Thread.interrupted()) { - int e = (int)c; - int u = (int)(c >>> 32); - int d = (u >> UAC_SHIFT) + parallelism; // active count - - if (e < 0 || (d <= 0 && tryTerminate(false, false))) - stat = w.qlock = -1; // pool is terminating - else if ((ns = w.nsteals) != 0) { // collect steals and retry - w.nsteals = 0; - U.getAndAddLong(this, STEALCOUNT, (long)ns); + private boolean awaitWork(WorkQueue w, int r) { + if (w == null || w.qlock < 0) // w is terminating + return false; + for (int pred = w.stackPred, spins = SPINS, ss;;) { + if ((ss = w.scanState) >= 0) + break; + else if (spins > 0) { + r ^= r << 6; r ^= r >>> 21; r ^= r << 7; + if (r >= 0 && --spins == 0) { // randomize spins + WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; + if (pred != 0 && (ws = workQueues) != null && + (j = pred & SMASK) < ws.length && + (v = ws[j]) != null && // see if pred parking + (v.parker == null || v.scanState >= 0)) + spins = SPINS; // continue spinning + } } - else { - long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : - ((long)(w.nextWait & E_MASK)) | // ctl to restore - ((long)(u + UAC_UNIT)) << 32); - if (pc != 0L) { // timed wait if last waiter - int dc = -(short)(c >>> TC_SHIFT); - parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: - (dc + 1) * IDLE_TIMEOUT); + else if (w.qlock < 0) // recheck after spins + return false; + else if (!Thread.interrupted()) { + long c, prevctl, parkTime, deadline; + int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); + if ((ac <= 0 && tryTerminate(false, false)) || + (runState & STOP) != 0) // pool terminating + return false; + if (ac <= 0 && ss == (int)c) { // is last waiter + prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); + int t = (short)(c >>> TC_SHIFT); // shrink excess spares + if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) + return false; // else use timed wait + parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else - parkTime = deadline = 0L; - if (w.eventCount == ec && ctl == c) { - Thread wt = Thread.currentThread(); - U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; // emulate LockSupport.park - if (w.eventCount == ec && ctl == c) - U.park(false, parkTime); // must recheck before park - w.parker = null; - U.putObject(wt, PARKBLOCKER, null); - if (parkTime != 0L && ctl == c && - deadline - System.nanoTime() <= 0L && - U.compareAndSwapLong(this, CTL, c, pc)) - stat = w.qlock = -1; // shrink pool + prevctl = parkTime = deadline = 0L; + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport + w.parker = wt; + if (w.scanState < 0 && ctl == c) // recheck before park + U.park(false, parkTime); + U.putOrderedObject(w, QPARKER, null); + U.putObject(wt, PARKBLOCKER, null); + if (w.scanState >= 0) + break; + if (parkTime != 0L && ctl == c && + deadline - System.nanoTime() <= 0L && + U.compareAndSwapLong(this, CTL, c, prevctl)) + return false; // shrink pool + } + } + return true; + } + + // Joining tasks + + /** + * Tries to steal and run tasks within the target's computation. + * Uses a variant of the top-level algorithm, restricted to tasks + * with the given task as ancestor: It prefers taking and running + * eligible tasks popped from the worker's own queue (via + * popCC). Otherwise it scans others, randomly moving on + * contention or execution, deciding to give up based on a + * checksum (via return codes frob pollAndExecCC). The maxTasks + * argument supports external usages; internal calls use zero, + * allowing unbounded steps (external calls trap non-positive + * values). + * + * @param w caller + * @param maxTasks if non-zero, the maximum number of other tasks to run + * @return task status on exit + */ + final int helpComplete(WorkQueue w, CountedCompleter task, + int maxTasks) { + WorkQueue[] ws; int s = 0, m; + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && + task != null && w != null) { + int mode = w.config; // for popCC + int r = w.hint ^ w.top; // arbitrary seed for origin + int origin = r & m; // first queue to scan + int h = 1; // 1:ran, >1:contended, <0:hash + for (int k = origin, oldSum = 0, checkSum = 0;;) { + CountedCompleter p; WorkQueue q; + if ((s = task.status) < 0) + break; + if (h == 1 && (p = w.popCC(task, mode)) != null) { + p.doExec(); // run local task + if (maxTasks != 0 && --maxTasks == 0) + break; + origin = k; // reset + oldSum = checkSum = 0; + } + else { // poll other queues + if ((q = ws[k]) == null) + h = 0; + else if ((h = q.pollAndExecCC(task)) < 0) + checkSum += h; + if (h > 0) { + if (h == 1 && maxTasks != 0 && --maxTasks == 0) + break; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + origin = k = r & m; // move and restart + oldSum = checkSum = 0; + } + else if ((k = (k + 1) & m) == origin) { + if (oldSum == (oldSum = checkSum)) + break; + checkSum = 0; + } } } } - return stat; - } - - /** - * Possibly releases (signals) a worker. Called only from scan() - * when a worker with apparently inactive status finds a non-empty - * queue. This requires revalidating all of the associated state - * from caller. - */ - private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, - WorkQueue q, int b) { - WorkQueue v; int e, i; Thread p; - if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && - ws != null && ws.length > (i = e & SMASK) && - (v = ws[i]) != null && ctl == c) { - long nc = (((long)(v.nextWait & E_MASK)) | - ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); - int ne = (e + E_SEQ) & E_MASK; - if (q != null && q.base == b && w.eventCount < 0 && - v.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { - v.eventCount = ne; - if ((p = v.parker) != null) - U.unpark(p); - } - } + return s; } /** @@ -1799,268 +1906,167 @@ * execute tasks from. The first call to this method upon a * waiting join will often entail scanning/search, (which is OK * because the joiner has nothing better to do), but this method - * leaves hints in workers to speed up subsequent calls. The - * implementation is very branchy to cope with potential - * inconsistencies or loops encountering chains that are stale, - * unknown, or so long that they are likely cyclic. + * leaves hints in workers to speed up subsequent calls. * - * @param joiner the joining worker + * @param w caller * @param task the task to join - * @return 0 if no progress can be made, negative if task - * known complete, else positive */ - private int tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { - int stat = 0, steps = 0; // bound to avoid cycles - if (task != null && joiner != null && - joiner.base - joiner.top >= 0) { // hoist checks - restart: for (;;) { - ForkJoinTask subtask = task; // current target - for (WorkQueue j = joiner, v;;) { // v is stealer of subtask - WorkQueue[] ws; int m, s, h; - if ((s = task.status) < 0) { - stat = s; - break restart; - } - if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) - break restart; // shutting down - if ((v = ws[h = (j.hint | 1) & m]) == null || - v.currentSteal != subtask) { - for (int origin = h;;) { // find stealer - if (((h = (h + 2) & m) & 15) == 1 && - (subtask.status < 0 || j.currentJoin != subtask)) - continue restart; // occasional staleness check - if ((v = ws[h]) != null && - v.currentSteal == subtask) { - j.hint = h; // save hint + private void helpStealer(WorkQueue w, ForkJoinTask task) { + WorkQueue[] ws = workQueues; + int oldSum = 0, checkSum, m; + if (ws != null && (m = ws.length - 1) >= 0 && w != null && + task != null) { + do { // restart point + checkSum = 0; // for stability check + ForkJoinTask subtask; + WorkQueue j = w, v; // v is subtask stealer + descent: for (subtask = task; subtask.status >= 0; ) { + for (int h = j.hint | 1, k = 0, i; ; k += 2) { + if (k > m) // can't find stealer + break descent; + if ((v = ws[i = (h + k) & m]) != null) { + if (v.currentSteal == subtask) { + j.hint = i; break; } - if (h == origin) - break restart; // cannot find stealer + checkSum += v.base; } } - for (;;) { // help stealer or descend to its stealer + for (;;) { // help v or descend ForkJoinTask[] a; int b; - if (subtask.status < 0) // surround probes with - continue restart; // consistency checks - if ((b = v.base) - v.top < 0 && (a = v.array) != null) { - int i = (((a.length - 1) & b) << ASHIFT) + ABASE; - ForkJoinTask t = - (ForkJoinTask)U.getObjectVolatile(a, i); - if (subtask.status < 0 || j.currentJoin != subtask || - v.currentSteal != subtask) - continue restart; // stale - stat = 1; // apparent progress - if (v.base == b) { - if (t == null) - break restart; - if (U.compareAndSwapObject(a, i, t, null)) { - U.putOrderedInt(v, QBASE, b + 1); - ForkJoinTask ps = joiner.currentSteal; - int jt = joiner.top; - do { - joiner.currentSteal = t; - t.doExec(); // clear local tasks too - } while (task.status >= 0 && - joiner.top != jt && - (t = joiner.pop()) != null); - joiner.currentSteal = ps; - break restart; - } - } + checkSum += (b = v.base); + ForkJoinTask next = v.currentJoin; + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) // stale + break descent; + if (b - v.top >= 0 || (a = v.array) == null) { + if ((subtask = next) == null) + break descent; + j = v; + break; } - else { // empty -- try to descend - ForkJoinTask next = v.currentJoin; - if (subtask.status < 0 || j.currentJoin != subtask || - v.currentSteal != subtask) - continue restart; // stale - else if (next == null || ++steps == MAX_HELP) - break restart; // dead-end or maybe cyclic - else { - subtask = next; - j = v; - break; + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + ForkJoinTask t = ((ForkJoinTask) + U.getObjectVolatile(a, i)); + if (v.base == b) { + if (t == null) // stale + break descent; + if (U.compareAndSwapObject(a, i, t, null)) { + v.base = b + 1; + ForkJoinTask ps = w.currentSteal; + int top = w.top; + do { + U.putOrderedObject(w, QCURRENTSTEAL, t); + t.doExec(); // clear local tasks too + } while (task.status >= 0 && + w.top != top && + (t = w.pop()) != null); + U.putOrderedObject(w, QCURRENTSTEAL, ps); + if (w.base != w.top) + return; // can't further help } } } } - } + } while (task.status >= 0 && oldSum != (oldSum = checkSum)); } - return stat; - } - - /** - * Analog of tryHelpStealer for CountedCompleters. Tries to steal - * and run tasks within the target's computation. - * - * @param task the task to join - * @param maxTasks the maximum number of other tasks to run - */ - final int helpComplete(WorkQueue joiner, CountedCompleter task, - int maxTasks) { - WorkQueue[] ws; int m; - int s = 0; - if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && - joiner != null && task != null) { - int j = joiner.poolIndex; - int scans = m + m + 1; - long c = 0L; // for stability check - for (int k = scans; ; j += 2) { - WorkQueue q; - if ((s = task.status) < 0) - break; - else if (joiner.internalPopAndExecCC(task)) { - if (--maxTasks <= 0) { - s = task.status; - break; - } - k = scans; - } - else if ((s = task.status) < 0) - break; - else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { - if (--maxTasks <= 0) { - s = task.status; - break; - } - k = scans; - } - else if (--k < 0) { - if (c == (c = ctl)) - break; - k = scans; - } - } - } - return s; } /** * Tries to decrement active count (sometimes implicitly) and * possibly release or create a compensating worker in preparation - * for blocking. Fails on contention or termination. Otherwise, - * adds a new thread if no idle workers are available and pool - * may become starved. + * for blocking. Returns false (retryable by caller), on + * contention, detected staleness, instability, or termination. * - * @param c the assumed ctl value + * @param w caller */ - final boolean tryCompensate(long c) { - WorkQueue[] ws = workQueues; - int pc = parallelism, e = (int)c, m, tc; - if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { - WorkQueue w = ws[e & m]; - if (e != 0 && w != null) { - Thread p; - long nc = ((long)(w.nextWait & E_MASK) | - (c & (AC_MASK|TC_MASK))); - int ne = (e + E_SEQ) & E_MASK; - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = ne; - if ((p = w.parker) != null) - U.unpark(p); - return true; // replace with idle worker + private boolean tryCompensate(WorkQueue w) { + boolean canBlock; + WorkQueue[] ws; long c; int m, pc, sp; + if (w == null || w.qlock < 0 || // caller terminating + (ws = workQueues) == null || (m = ws.length - 1) <= 0 || + (pc = config & SMASK) == 0) // parallelism disabled + canBlock = false; + else if ((sp = (int)(c = ctl)) != 0) // release idle worker + canBlock = tryRelease(c, ws[sp & m], 0L); + else { + int ac = (int)(c >> AC_SHIFT) + pc; + int tc = (short)(c >> TC_SHIFT) + pc; + int nbusy = 0; // validate saturation + for (int i = 0; i <= m; ++i) { // two passes of odd indices + WorkQueue v; + if ((v = ws[((i << 1) | 1) & m]) != null) { + if ((v.scanState & SCANNING) != 0) + break; + ++nbusy; } } - else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && - (int)(c >> AC_SHIFT) + pc > 1) { - long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) - return true; // no compensation + if (nbusy != (tc << 1) || ctl != c) + canBlock = false; // unstable or stale + else if (tc >= pc && ac > 1 && w.isEmpty()) { + long nc = ((AC_MASK & (c - AC_UNIT)) | + (~AC_MASK & c)); // uncompensated + canBlock = U.compareAndSwapLong(this, CTL, c, nc); } - else if (tc + pc < MAX_CAP) { - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) { - ForkJoinWorkerThreadFactory fac; - Throwable ex = null; - ForkJoinWorkerThread wt = null; - try { - if ((fac = factory) != null && - (wt = fac.newThread(this)) != null) { - wt.start(); - return true; - } - } catch (Throwable rex) { - ex = rex; - } - deregisterWorker(wt, ex); // clean up and return false - } + else if (tc >= MAX_CAP || + (this == common && tc >= pc + commonMaxSpares)) + throw new RejectedExecutionException( + "Thread limit exceeded replacing blocked worker"); + else { // similar to tryAddWorker + boolean add = false; int rs; // CAS within lock + long nc = ((AC_MASK & c) | + (TC_MASK & (c + TC_UNIT))); + if (((rs = lockRunState()) & STOP) == 0) + add = U.compareAndSwapLong(this, CTL, c, nc); + unlockRunState(rs, rs & ~RSLOCK); + canBlock = add && createWorker(); // throws on exception } } - return false; + return canBlock; } /** - * Helps and/or blocks until the given task is done. + * Helps and/or blocks until the given task is done or timeout. * - * @param joiner the joining worker + * @param w caller * @param task the task + * @param deadline for timed waits, if nonzero * @return task status on exit */ - final int awaitJoin(WorkQueue joiner, ForkJoinTask task) { + final int awaitJoin(WorkQueue w, ForkJoinTask task, long deadline) { int s = 0; - if (task != null && (s = task.status) >= 0 && joiner != null) { - ForkJoinTask prevJoin = joiner.currentJoin; - joiner.currentJoin = task; - do {} while (joiner.tryRemoveAndExec(task) && // process local tasks - (s = task.status) >= 0); - if (s >= 0 && (task instanceof CountedCompleter)) - s = helpComplete(joiner, (CountedCompleter)task, Integer.MAX_VALUE); - long cc = 0; // for stability checks - while (s >= 0 && (s = task.status) >= 0) { - if ((s = tryHelpStealer(joiner, task)) == 0 && - (s = task.status) >= 0) { - if (!tryCompensate(cc)) - cc = ctl; - else { - if (task.trySetSignal() && (s = task.status) >= 0) { - synchronized (task) { - if (task.status >= 0) { - try { // see ForkJoinTask - task.wait(); // for explanation - } catch (InterruptedException ie) { - } - } - else - task.notifyAll(); - } - } - long c; // reactivate - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, - ((c & ~AC_MASK) | - ((c & AC_MASK) + AC_UNIT)))); - } + if (task != null && w != null) { + ForkJoinTask prevJoin = w.currentJoin; + U.putOrderedObject(w, QCURRENTJOIN, task); + CountedCompleter cc = (task instanceof CountedCompleter) ? + (CountedCompleter)task : null; + for (;;) { + if ((s = task.status) < 0) + break; + if (cc != null) + helpComplete(w, cc, 0); + else if (w.base == w.top || w.tryRemoveAndExec(task)) + helpStealer(w, task); + if ((s = task.status) < 0) + break; + long ms, ns; + if (deadline == 0L) + ms = 0L; + else if ((ns = deadline - System.nanoTime()) <= 0L) + break; + else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) + ms = 1L; + if (tryCompensate(w)) { + task.internalWait(ms); + U.getAndAddLong(this, CTL, AC_UNIT); } } - joiner.currentJoin = prevJoin; + U.putOrderedObject(w, QCURRENTJOIN, prevJoin); } return s; } - /** - * Stripped-down variant of awaitJoin used by timed joins. Tries - * to help join only while there is continuous progress. (Caller - * will then enter a timed wait.) - * - * @param joiner the joining worker - * @param task the task - */ - final void helpJoinOnce(WorkQueue joiner, ForkJoinTask task) { - int s; - if (joiner != null && task != null && (s = task.status) >= 0) { - ForkJoinTask prevJoin = joiner.currentJoin; - joiner.currentJoin = task; - do {} while (joiner.tryRemoveAndExec(task) && // process local tasks - (s = task.status) >= 0); - if (s >= 0) { - if (task instanceof CountedCompleter) - helpComplete(joiner, (CountedCompleter)task, Integer.MAX_VALUE); - do {} while (task.status >= 0 && - tryHelpStealer(joiner, task) > 0); - } - joiner.currentJoin = prevJoin; - } - } + // Specialized scanning /** * Returns a (probably) non-empty steal queue, if one is found @@ -2068,19 +2074,24 @@ * caller if, by the time it tries to use the queue, it is empty. */ private WorkQueue findNonEmptyStealQueue() { + WorkQueue[] ws; int m; // one-shot version of scan loop int r = ThreadLocalRandom.nextSecondarySeed(); - for (;;) { - int ps = plock, m; WorkQueue[] ws; WorkQueue q; - if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { - for (int j = (m + 1) << 2; j >= 0; --j) { - if ((q = ws[(((r - j) << 1) | 1) & m]) != null && - q.base - q.top < 0) + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { + for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { + WorkQueue q; int b; + if ((q = ws[k]) != null) { + if ((b = q.base) - q.top < 0) return q; + checkSum += b; + } + if ((k = (k + 1) & m) == origin) { + if (oldSum == (oldSum = checkSum)) + break; + checkSum = 0; } } - if (plock == ps) - return null; } + return null; } /** @@ -2090,35 +2101,34 @@ * find tasks either. */ final void helpQuiescePool(WorkQueue w) { - ForkJoinTask ps = w.currentSteal; + ForkJoinTask ps = w.currentSteal; // save context for (boolean active = true;;) { long c; WorkQueue q; ForkJoinTask t; int b; - while ((t = w.nextLocalTask()) != null) - t.doExec(); + w.execLocalTasks(); // run locals before each scan if ((q = findNonEmptyStealQueue()) != null) { if (!active) { // re-establish active count active = true; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, - ((c & ~AC_MASK) | - ((c & AC_MASK) + AC_UNIT)))); + U.getAndAddLong(this, CTL, AC_UNIT); } - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) - w.runTask(t); + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + U.putOrderedObject(w, QCURRENTSTEAL, t); + t.doExec(); + if (++w.nsteals < 0) + w.transferStealCount(this); + } } else if (active) { // decrement active count without queuing - long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); - if ((int)(nc >> AC_SHIFT) + parallelism == 0) + long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); + if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0) break; // bypass decrement-then-increment if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; } - else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && - U.compareAndSwapLong - (this, CTL, c, ((c & ~AC_MASK) | - ((c & AC_MASK) + AC_UNIT)))) + else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 && + U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) break; } + U.putOrderedObject(w, QCURRENTSTEAL, ps); } /** @@ -2141,7 +2151,7 @@ /** * Returns a cheap heuristic guide for task partitioning when * programmers, frameworks, tools, or languages have little or no - * idea about task granularity. In essence by offering this + * idea about task granularity. In essence, by offering this * method, we ask users only about tradeoffs in overhead vs * expected throughput and its variance, rather than how finely to * partition tasks. @@ -2179,15 +2189,12 @@ * many of these by further considering the number of "idle" * threads, that are known to have zero queued tasks, so * compensate by a factor of (#idle/#active) threads. - * - * Note: The approximation of #busy workers as #active workers is - * not very good under current signalling scheme, and should be - * improved. */ static int getSurplusQueuedTaskCount() { Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { - int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; + int p = (pool = (wt = (ForkJoinWorkerThread)t).pool). + config & SMASK; int n = (q = wt.workQueue).top - q.base; int a = (int)(pool.ctl >> AC_SHIFT) + p; return n - (a > (p >>>= 1) ? 0 : @@ -2202,13 +2209,7 @@ // Termination /** - * Possibly initiates and/or completes termination. The caller - * triggering termination runs three passes through workQueues: - * (0) Setting termination status, followed by wakeups of queued - * workers; (1) cancelling all tasks; (2) interrupting lagging - * threads (likely in external tasks, but possibly also blocked in - * joins). Each pass repeats previous steps because of potential - * lagging thread creation. + * Possibly initiates and/or completes termination. * * @param now if true, unconditionally terminate, else only * if no work and no active workers @@ -2216,166 +2217,256 @@ * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - int ps; - if (this == common) // cannot shut down + int rs; + if (this == common) // cannot shut down return false; - if ((ps = plock) >= 0) { // enable by setting plock + if ((rs = runState) >= 0) { if (!enable) return false; - if ((ps & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); + rs = lockRunState(); // enter SHUTDOWN phase + unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); } - for (long c;;) { - if (((c = ctl) & STOP_BIT) != 0) { // already terminating - if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { - synchronized (this) { - notifyAll(); // signal when 0 workers - } - } - return true; - } - if (!now) { // check if idle & no tasks - WorkQueue[] ws; WorkQueue w; - if ((int)(c >> AC_SHIFT) + parallelism > 0) - return false; - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; ++i) { - if ((w = ws[i]) != null && - (!w.isEmpty() || - ((i & 1) != 0 && w.eventCount >= 0))) { - signalWork(ws, w); - return false; + + if ((rs & STOP) == 0) { + if (!now) { // check quiescence + for (long oldSum = 0L;;) { // repeat until stable + WorkQueue[] ws; WorkQueue w; int m, b; long c; + long checkSum = ctl; + if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) + return false; // still active workers + if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) + break; // check queues + for (int i = 0; i <= m; ++i) { + if ((w = ws[i]) != null) { + if ((b = w.base) != w.top || w.scanState >= 0 || + w.currentSteal != null) { + tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); + return false; // arrange for recheck + } + checkSum += b; + if ((i & 1) == 0) + w.qlock = -1; // try to disable external } } + if (oldSum == (oldSum = checkSum)) + break; } } - if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { - for (int pass = 0; pass < 3; ++pass) { - WorkQueue[] ws; WorkQueue w; Thread wt; - if ((ws = workQueues) != null) { - int n = ws.length; - for (int i = 0; i < n; ++i) { - if ((w = ws[i]) != null) { - w.qlock = -1; - if (pass > 0) { - w.cancelAll(); - if (pass > 1 && (wt = w.owner) != null) { - if (!wt.isInterrupted()) { - try { - wt.interrupt(); - } catch (Throwable ignore) { - } - } - U.unpark(wt); - } + if ((runState & STOP) == 0) { + rs = lockRunState(); // enter STOP phase + unlockRunState(rs, (rs & ~RSLOCK) | STOP); + } + } + + int pass = 0; // 3 passes to help terminate + for (long oldSum = 0L;;) { // or until done or stable + WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m; + long checkSum = ctl; + if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || + (ws = workQueues) == null || (m = ws.length - 1) <= 0) { + if ((runState & TERMINATED) == 0) { + rs = lockRunState(); // done + unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); + synchronized (this) { notifyAll(); } // for awaitTermination + } + break; + } + for (int i = 0; i <= m; ++i) { + if ((w = ws[i]) != null) { + checkSum += w.base; + w.qlock = -1; // try to disable + if (pass > 0) { + w.cancelAll(); // clear queue + if (pass > 1 && (wt = w.owner) != null) { + if (!wt.isInterrupted()) { + try { // unblock join + wt.interrupt(); + } catch (Throwable ignore) { } } - } - // Wake up workers parked on event queue - int i, e; long cc; Thread p; - while ((e = (int)(cc = ctl) & E_MASK) != 0 && - (i = e & SMASK) < n && i >= 0 && - (w = ws[i]) != null) { - long nc = ((long)(w.nextWait & E_MASK) | - ((cc + AC_UNIT) & AC_MASK) | - (cc & (TC_MASK|STOP_BIT))); - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, cc, nc)) { - w.eventCount = (e + E_SEQ) & E_MASK; - w.qlock = -1; - if ((p = w.parker) != null) - U.unpark(p); - } + if (w.scanState < 0) + U.unpark(wt); // wake up } } } } + if (checkSum != oldSum) { // unstable + oldSum = checkSum; + pass = 0; + } + else if (pass > 3 && pass > m) // can't further help + break; + else if (++pass > 1) { // try to dequeue + long c; int j = 0, sp; // bound attempts + while (j++ <= m && (sp = (int)(c = ctl)) != 0) + tryRelease(c, ws[sp & m], AC_UNIT); + } } + return true; } - // external operations on common pool + // External operations /** - * Returns common pool queue for a thread that has submitted at - * least one task. + * Full version of externalPush, handling uncommon cases, as well + * as performing secondary initialization upon the first + * submission of the first task to the pool. It also detects + * first submission by an external thread and creates a new shared + * queue if the one at index if empty or contended. + * + * @param task the task. Caller must ensure non-null. */ - static WorkQueue commonSubmitterQueue() { - ForkJoinPool p; WorkQueue[] ws; int m, z; - return ((z = ThreadLocalRandom.getProbe()) != 0 && - (p = common) != null && - (ws = p.workQueues) != null && - (m = ws.length - 1) >= 0) ? - ws[m & z & SQMASK] : null; + private void externalSubmit(ForkJoinTask task) { + int r; // initialize caller's probe + if ((r = ThreadLocalRandom.getProbe()) == 0) { + ThreadLocalRandom.localInit(); + r = ThreadLocalRandom.getProbe(); + } + for (;;) { + WorkQueue[] ws; WorkQueue q; int rs, m, k; + boolean move = false; + if ((rs = runState) < 0) { + tryTerminate(false, false); // help terminate + throw new RejectedExecutionException(); + } + else if ((rs & STARTED) == 0 || // initialize + ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { + int ns = 0; + rs = lockRunState(); + try { + if ((rs & STARTED) == 0) { + U.compareAndSwapObject(this, STEALCOUNTER, null, + new AtomicLong()); + // create workQueues array with size a power of two + int p = config & SMASK; // ensure at least 2 slots + int n = (p > 1) ? p - 1 : 1; + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; + workQueues = new WorkQueue[n]; + ns = STARTED; + } + } finally { + unlockRunState(rs, (rs & ~RSLOCK) | ns); + } + } + else if ((q = ws[k = r & m & SQMASK]) != null) { + if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { + ForkJoinTask[] a = q.array; + int s = q.top; + boolean submitted = false; // initial submission or resizing + try { // locked version of push + if ((a != null && a.length > s + 1 - q.base) || + (a = q.growArray()) != null) { + int j = (((a.length - 1) & s) << ASHIFT) + ABASE; + U.putOrderedObject(a, j, task); + U.putOrderedInt(q, QTOP, s + 1); + submitted = true; + } + } finally { + U.compareAndSwapInt(q, QLOCK, 1, 0); + } + if (submitted) { + signalWork(ws, q); + return; + } + } + move = true; // move on failure + } + else if (((rs = runState) & RSLOCK) == 0) { // create new queue + q = new WorkQueue(this, null); + q.hint = r; + q.config = k | SHARED_QUEUE; + q.scanState = INACTIVE; + rs = lockRunState(); // publish index + if (rs > 0 && (ws = workQueues) != null && + k < ws.length && ws[k] == null) + ws[k] = q; // else terminated + unlockRunState(rs, rs & ~RSLOCK); + } + else + move = true; // move if busy + if (move) + r = ThreadLocalRandom.advanceProbe(r); + } } /** - * Tries to pop the given task from submitter's queue in common pool. + * Tries to add the given task to a submission queue at + * submitter's current queue. Only the (vastly) most common path + * is directly handled in this method, while screening for need + * for externalSubmit. + * + * @param task the task. Caller must ensure non-null. */ - final boolean tryExternalUnpush(ForkJoinTask task) { - WorkQueue joiner; ForkJoinTask[] a; int m, s; - WorkQueue[] ws = workQueues; - int z = ThreadLocalRandom.getProbe(); - boolean popped = false; - if (ws != null && (m = ws.length - 1) >= 0 && - (joiner = ws[z & m & SQMASK]) != null && - joiner.base != (s = joiner.top) && - (a = joiner.array) != null) { - long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; - if (U.getObject(a, j) == task && - U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { - if (joiner.top == s && joiner.array == a && - U.compareAndSwapObject(a, j, task, null)) { - joiner.top = s - 1; - popped = true; - } - joiner.qlock = 0; + final void externalPush(ForkJoinTask task) { + WorkQueue[] ws; WorkQueue q; int m; + int r = ThreadLocalRandom.getProbe(); + int rs = runState; + if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && + (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && + U.compareAndSwapInt(q, QLOCK, 0, 1)) { + ForkJoinTask[] a; int am, n, s; + if ((a = q.array) != null && + (am = a.length - 1) > (n = (s = q.top) - q.base)) { + int j = ((am & s) << ASHIFT) + ABASE; + U.putOrderedObject(a, j, task); + U.putOrderedInt(q, QTOP, s + 1); + U.putOrderedInt(q, QLOCK, 0); + if (n <= 1) + signalWork(ws, q); + return; } + U.compareAndSwapInt(q, QLOCK, 1, 0); } - return popped; + externalSubmit(task); } - final int externalHelpComplete(CountedCompleter task, int maxTasks) { - WorkQueue joiner; int m; - WorkQueue[] ws = workQueues; - int j = ThreadLocalRandom.getProbe(); - int s = 0; - if (ws != null && (m = ws.length - 1) >= 0 && - (joiner = ws[j & m & SQMASK]) != null && task != null) { - int scans = m + m + 1; - long c = 0L; // for stability check - j |= 1; // poll odd queues - for (int k = scans; ; j += 2) { - WorkQueue q; - if ((s = task.status) < 0) - break; - else if (joiner.externalPopAndExecCC(task)) { - if (--maxTasks <= 0) { - s = task.status; - break; - } - k = scans; + /** + * Returns common pool queue for an external thread. + */ + static WorkQueue commonSubmitterQueue() { + ForkJoinPool p = common; + int r = ThreadLocalRandom.getProbe(); + WorkQueue[] ws; int m; + return (p != null && (ws = p.workQueues) != null && + (m = ws.length - 1) >= 0) ? + ws[m & r & SQMASK] : null; + } + + /** + * Performs tryUnpush for an external submitter: Finds queue, + * locks if apparently non-empty, validates upon locking, and + * adjusts top. Each check can fail but rarely does. + */ + final boolean tryExternalUnpush(ForkJoinTask task) { + WorkQueue[] ws; WorkQueue w; ForkJoinTask[] a; int m, s; + int r = ThreadLocalRandom.getProbe(); + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && + (w = ws[m & r & SQMASK]) != null && + (a = w.array) != null && (s = w.top) != w.base) { + long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; + if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { + if (w.top == s && w.array == a && + U.getObject(a, j) == task && + U.compareAndSwapObject(a, j, task, null)) { + U.putOrderedInt(w, QTOP, s - 1); + U.putOrderedInt(w, QLOCK, 0); + return true; } - else if ((s = task.status) < 0) - break; - else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { - if (--maxTasks <= 0) { - s = task.status; - break; - } - k = scans; - } - else if (--k < 0) { - if (c == (c = ctl)) - break; - k = scans; - } + U.compareAndSwapInt(w, QLOCK, 1, 0); } } - return s; + return false; + } + + /** + * Performs helpComplete for an external submitter. + */ + final int externalHelpComplete(CountedCompleter task, int maxTasks) { + WorkQueue[] ws; int n; + int r = ThreadLocalRandom.getProbe(); + return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 : + helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks); } // Exported methods @@ -2447,7 +2538,7 @@ this(checkParallelism(parallelism), checkFactory(factory), handler, - (asyncMode ? FIFO_QUEUE : LIFO_QUEUE), + asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } @@ -2478,8 +2569,7 @@ this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; - this.mode = (short)mode; - this.parallelism = (short)parallelism; + this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } @@ -2624,7 +2714,7 @@ // In previous versions of this class, this method constructed // a task to run ForkJoinTask.invokeAll, but now external // invocation of multiple tasks is at least as efficient. - ArrayList> futures = new ArrayList>(tasks.size()); + ArrayList> futures = new ArrayList<>(tasks.size()); boolean done = false; try { @@ -2670,7 +2760,7 @@ */ public int getParallelism() { int par; - return ((par = parallelism) > 0) ? par : 1; + return ((par = config & SMASK) > 0) ? par : 1; } /** @@ -2692,7 +2782,7 @@ * @return the number of worker threads */ public int getPoolSize() { - return parallelism + (short)(ctl >>> TC_SHIFT); + return (config & SMASK) + (short)(ctl >>> TC_SHIFT); } /** @@ -2702,7 +2792,7 @@ * @return {@code true} if this pool uses async mode */ public boolean getAsyncMode() { - return mode == FIFO_QUEUE; + return (config & FIFO_QUEUE) != 0; } /** @@ -2733,7 +2823,7 @@ * @return the number of active threads */ public int getActiveThreadCount() { - int r = parallelism + (int)(ctl >> AC_SHIFT); + int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; // suppress momentarily negative values } @@ -2749,7 +2839,7 @@ * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { - return parallelism + (int)(ctl >> AC_SHIFT) <= 0; + return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0; } /** @@ -2764,7 +2854,8 @@ * @return the number of steals */ public long getStealCount() { - long count = stealCount; + AtomicLong sc = stealCounter; + long count = (sc == null) ? 0L : sc.get(); WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { @@ -2894,7 +2985,8 @@ public String toString() { // Use a single pass through workQueues to collect counts long qt = 0L, qs = 0L; int rc = 0; - long st = stealCount; + AtomicLong sc = stealCounter; + long st = (sc == null) ? 0L : sc.get(); long c = ctl; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { @@ -2912,16 +3004,16 @@ } } } - int pc = parallelism; + int pc = (config & SMASK); int tc = pc + (short)(c >>> TC_SHIFT); int ac = pc + (int)(c >> AC_SHIFT); if (ac < 0) // ignore transient negative ac = 0; - String level; - if ((c & STOP_BIT) != 0) - level = (tc == 0) ? "Terminated" : "Terminating"; - else - level = plock < 0 ? "Shutting down" : "Running"; + int rs = runState; + String level = ((rs & TERMINATED) != 0 ? "Terminated" : + (rs & STOP) != 0 ? "Terminating" : + (rs & SHUTDOWN) != 0 ? "Shutting down" : + "Running"); return super.toString() + "[" + level + ", parallelism = " + pc + @@ -2983,9 +3075,7 @@ * @return {@code true} if all tasks have completed following shut down */ public boolean isTerminated() { - long c = ctl; - return ((c & STOP_BIT) != 0L && - (short)(c >>> TC_SHIFT) + parallelism <= 0); + return (runState & TERMINATED) != 0; } /** @@ -3002,9 +3092,8 @@ * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { - long c = ctl; - return ((c & STOP_BIT) != 0L && - (short)(c >>> TC_SHIFT) + parallelism > 0); + int rs = runState; + return (rs & STOP) != 0 && (rs & TERMINATED) == 0; } /** @@ -3013,7 +3102,7 @@ * @return {@code true} if this pool has been shut down */ public boolean isShutdown() { - return plock < 0; + return (runState & SHUTDOWN) != 0; } /** @@ -3090,8 +3179,9 @@ } found = false; for (int j = (m + 1) << 2; j >= 0; --j) { - ForkJoinTask t; WorkQueue q; int b; - if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { + ForkJoinTask t; WorkQueue q; int b, k; + if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null && + (b = q.base) - q.top < 0) { found = true; if ((t = q.pollAt(b)) != null) t.doExec(); @@ -3115,8 +3205,8 @@ * in {@link ForkJoinPool}s. * *

    A {@code ManagedBlocker} provides two methods. Method - * {@code isReleasable} must return {@code true} if blocking is - * not necessary. Method {@code block} blocks the current thread + * {@link #isReleasable} must return {@code true} if blocking is + * not necessary. Method {@link #block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. @@ -3185,37 +3275,46 @@ } /** - * Blocks in accord with the given blocker. If the current thread - * is a {@link ForkJoinWorkerThread}, this method possibly - * arranges for a spare thread to be activated if necessary to - * ensure sufficient parallelism while the current thread is blocked. + * Runs the given possibly blocking task. When {@linkplain + * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this + * method possibly arranges for a spare thread to be activated if + * necessary to ensure sufficient parallelism while the current + * thread is blocked in {@link ManagedBlocker#block blocker.block()}. * - *

    If the caller is not a {@link ForkJoinTask}, this method is + *

    This method repeatedly calls {@code blocker.isReleasable()} and + * {@code blocker.block()} until either method returns {@code true}. + * Every call to {@code blocker.block()} is preceded by a call to + * {@code blocker.isReleasable()} that returned {@code false}. + * + *

    If not running in a ForkJoinPool, this method is * behaviorally equivalent to *

     {@code
          * while (!blocker.isReleasable())
          *   if (blocker.block())
    -     *     return;
    -     * }
    + * break;} * - * If the caller is a {@code ForkJoinTask}, then the pool may - * first be expanded to ensure parallelism, and later adjusted. + * If running in a ForkJoinPool, the pool may first be expanded to + * ensure sufficient parallelism available during the call to + * {@code blocker.block()}. * - * @param blocker the blocker - * @throws InterruptedException if blocker.block did so + * @param blocker the blocker task + * @throws InterruptedException if {@code blocker.block()} did so */ public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { + ForkJoinPool p; + ForkJoinWorkerThread wt; Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; + if ((t instanceof ForkJoinWorkerThread) && + (p = (wt = (ForkJoinWorkerThread)t).pool) != null) { + WorkQueue w = wt.workQueue; while (!blocker.isReleasable()) { - if (p.tryCompensate(p.ctl)) { + if (p.tryCompensate(w)) { try { do {} while (!blocker.isReleasable() && !blocker.block()); } finally { - p.incrementActiveCount(); + U.getAndAddLong(p, CTL, AC_UNIT); } break; } @@ -3241,15 +3340,18 @@ // Unsafe mechanics private static final sun.misc.Unsafe U; + private static final int ABASE; + private static final int ASHIFT; private static final long CTL; + private static final long RUNSTATE; + private static final long STEALCOUNTER; private static final long PARKBLOCKER; - private static final int ABASE; - private static final int ASHIFT; - private static final long STEALCOUNT; - private static final long PLOCK; - private static final long INDEXSEED; - private static final long QBASE; + private static final long QTOP; private static final long QLOCK; + private static final long QSCANSTATE; + private static final long QPARKER; + private static final long QCURRENTSTEAL; + private static final long QCURRENTJOIN; static { // initialize field offsets for CAS etc @@ -3258,20 +3360,26 @@ Class k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); - STEALCOUNT = U.objectFieldOffset - (k.getDeclaredField("stealCount")); - PLOCK = U.objectFieldOffset - (k.getDeclaredField("plock")); - INDEXSEED = U.objectFieldOffset - (k.getDeclaredField("indexSeed")); + RUNSTATE = U.objectFieldOffset + (k.getDeclaredField("runState")); + STEALCOUNTER = U.objectFieldOffset + (k.getDeclaredField("stealCounter")); Class tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class wk = WorkQueue.class; - QBASE = U.objectFieldOffset - (wk.getDeclaredField("base")); + QTOP = U.objectFieldOffset + (wk.getDeclaredField("top")); QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); + QSCANSTATE = U.objectFieldOffset + (wk.getDeclaredField("scanState")); + QPARKER = U.objectFieldOffset + (wk.getDeclaredField("parker")); + QCURRENTSTEAL = U.objectFieldOffset + (wk.getDeclaredField("currentSteal")); + QCURRENTJOIN = U.objectFieldOffset + (wk.getDeclaredField("currentJoin")); Class ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); @@ -3282,6 +3390,7 @@ throw new Error(e); } + commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); @@ -3289,7 +3398,7 @@ common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction() { public ForkJoinPool run() { return makeCommonPool(); }}); - int par = common.parallelism; // report 1 even if threads disabled + int par = common.config & SMASK; // report 1 even if threads disabled commonParallelism = par > 0 ? par : 1; } @@ -3308,6 +3417,8 @@ ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); + String mp = System.getProperty + ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) @@ -3316,6 +3427,8 @@ if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); + if (mp != null) + commonMaxSpares = Integer.parseInt(mp); } catch (Exception ignore) { } if (factory == null) { diff -r 5da963ed0720 -r 5853628b0e63 jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java Thu Sep 04 12:23:01 2014 -0400 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java Fri Sep 05 10:54:28 2014 +0200 @@ -297,15 +297,22 @@ } /** - * Tries to set SIGNAL status unless already completed. Used by - * ForkJoinPool. Other variants are directly incorporated into - * externalAwaitDone etc. + * If not done, sets SIGNAL status and performs Object.wait(timeout). + * This task may or may not be done on exit. Ignores interrupts. * - * @return true if successful + * @param timeout using Object.wait conventions. */ - final boolean trySetSignal() { - int s = status; - return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); + final void internalWait(long timeout) { + int s; + if ((s = status) >= 0 && // force completer to issue notify + U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + try { wait(timeout); } catch (InterruptedException ie) { } + else + notifyAll(); + } + } } /** @@ -313,35 +320,29 @@ * @return status upon completion */ private int externalAwaitDone() { - int s; - ForkJoinPool cp = ForkJoinPool.common; - if ((s = status) >= 0) { - if (cp != null) { - if (this instanceof CountedCompleter) - s = cp.externalHelpComplete((CountedCompleter)this, Integer.MAX_VALUE); - else if (cp.tryExternalUnpush(this)) - s = doExec(); - } - if (s >= 0 && (s = status) >= 0) { - boolean interrupted = false; - do { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) { - try { - wait(); - } catch (InterruptedException ie) { - interrupted = true; - } + int s = ((this instanceof CountedCompleter) ? // try helping + ForkJoinPool.common.externalHelpComplete( + (CountedCompleter)this, 0) : + ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); + if (s >= 0 && (s = status) >= 0) { + boolean interrupted = false; + do { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { + try { + wait(0L); + } catch (InterruptedException ie) { + interrupted = true; } - else - notifyAll(); } + else + notifyAll(); } - } while ((s = status) >= 0); - if (interrupted) - Thread.currentThread().interrupt(); - } + } + } while ((s = status) >= 0); + if (interrupted) + Thread.currentThread().interrupt(); } return s; } @@ -351,22 +352,22 @@ */ private int externalInterruptibleAwaitDone() throws InterruptedException { int s; - ForkJoinPool cp = ForkJoinPool.common; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0 && cp != null) { - if (this instanceof CountedCompleter) - cp.externalHelpComplete((CountedCompleter)this, Integer.MAX_VALUE); - else if (cp.tryExternalUnpush(this)) - doExec(); - } - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) - wait(); - else - notifyAll(); + if ((s = status) >= 0 && + (s = ((this instanceof CountedCompleter) ? + ForkJoinPool.common.externalHelpComplete( + (CountedCompleter)this, 0) : + ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : + 0)) >= 0) { + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(0L); + else + notifyAll(); + } } } } @@ -386,7 +387,7 @@ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : - wt.pool.awaitJoin(w, this) : + wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } @@ -399,7 +400,8 @@ int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) : + (wt = (ForkJoinWorkerThread)t).pool. + awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); } @@ -577,7 +579,7 @@ Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (false && e.thrower != Thread.currentThread().getId()) { + if (e.thrower != Thread.currentThread().getId()) { Class ec = ex.getClass(); try { Constructor noArgCtor = null; @@ -587,13 +589,17 @@ Class[] ps = c.getParameterTypes(); if (ps.length == 0) noArgCtor = c; - else if (ps.length == 1 && ps[0] == Throwable.class) - return (Throwable)(c.newInstance(ex)); + else if (ps.length == 1 && ps[0] == Throwable.class) { + Throwable wx = (Throwable)c.newInstance(ex); + return (wx == null) ? ex : wx; + } } if (noArgCtor != null) { Throwable wx = (Throwable)(noArgCtor.newInstance()); - wx.initCause(ex); - return wx; + if (wx != null) { + wx.initCause(ex); + return wx; + } } } catch (Exception ignore) { } @@ -1017,67 +1023,40 @@ */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + int s; + long nanos = unit.toNanos(timeout); if (Thread.interrupted()) throw new InterruptedException(); - // Messy in part because we measure in nanosecs, but wait in millisecs - int s; long ms; - long ns = unit.toNanos(timeout); - ForkJoinPool cp; - if ((s = status) >= 0 && ns > 0L) { - long deadline = System.nanoTime() + ns; - ForkJoinPool p = null; - ForkJoinPool.WorkQueue w = null; + if ((s = status) >= 0 && nanos > 0L) { + long d = System.nanoTime() + nanos; + long deadline = (d == 0L) ? 1L : d; // avoid 0 Thread t = Thread.currentThread(); if (t instanceof ForkJoinWorkerThread) { ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - p = wt.pool; - w = wt.workQueue; - p.helpJoinOnce(w, this); // no retries on failure - } - else if ((cp = ForkJoinPool.common) != null) { - if (this instanceof CountedCompleter) - cp.externalHelpComplete((CountedCompleter)this, Integer.MAX_VALUE); - else if (cp.tryExternalUnpush(this)) - doExec(); + s = wt.pool.awaitJoin(wt.workQueue, this, deadline); } - boolean canBlock = false; - boolean interrupted = false; - try { - while ((s = status) >= 0) { - if (w != null && w.qlock < 0) - cancelIgnoringExceptions(this); - else if (!canBlock) { - if (p == null || p.tryCompensate(p.ctl)) - canBlock = true; - } - else { - if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && - U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) { - try { - wait(ms); - } catch (InterruptedException ie) { - if (p == null) - interrupted = true; - } - } - else - notifyAll(); - } + else if ((s = ((this instanceof CountedCompleter) ? + ForkJoinPool.common.externalHelpComplete( + (CountedCompleter)this, 0) : + ForkJoinPool.common.tryExternalUnpush(this) ? + doExec() : 0)) >= 0) { + long ns, ms; // measure in nanosecs, but wait in millisecs + while ((s = status) >= 0 && + (ns = deadline - System.nanoTime()) > 0L) { + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && + U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(ms); // OK to throw InterruptedException + else + notifyAll(); } - if ((s = status) < 0 || interrupted || - (ns = deadline - System.nanoTime()) <= 0L) - break; } } - } finally { - if (p != null && canBlock) - p.incrementActiveCount(); } - if (interrupted) - throw new InterruptedException(); } + if (s >= 0) + s = status; if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) diff -r 5da963ed0720 -r 5853628b0e63 jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java --- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Thu Sep 04 12:23:01 2014 -0400 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Fri Sep 05 10:54:28 2014 +0200 @@ -66,7 +66,7 @@ * owning thread. * * Support for (non-public) subclass InnocuousForkJoinWorkerThread - * requires that we break quite a lot of encapulation (via Unsafe) + * requires that we break quite a lot of encapsulation (via Unsafe) * both here and in the subclass to access and set Thread fields. */ @@ -118,7 +118,7 @@ * @return the index number */ public int getPoolIndex() { - return workQueue.poolIndex >>> 1; // ignore odd/even tag bit + return workQueue.getPoolIndex(); } /** @@ -171,7 +171,7 @@ } /** - * Erases ThreadLocals by nulling out Thread maps + * Erases ThreadLocals by nulling out Thread maps. */ final void eraseThreadLocals() { U.putObject(this, THREADLOCALS, null); @@ -246,8 +246,8 @@ /** * Returns a new group with the system ThreadGroup (the - * topmost, parentless group) as parent. Uses Unsafe to - * traverse Thread group and ThreadGroup parent fields. + * topmost, parent-less group) as parent. Uses Unsafe to + * traverse Thread.group and ThreadGroup.parent fields. */ private static ThreadGroup createThreadGroup() { try { @@ -274,4 +274,3 @@ } } -