--- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Fri Jul 15 13:51:43 2016 -0700
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Fri Jul 15 13:55:51 2016 -0700
@@ -36,6 +36,8 @@
package java.util.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.security.AccessControlContext;
import java.security.Permissions;
import java.security.ProtectionDomain;
@@ -44,7 +46,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.locks.LockSupport;
/**
@@ -81,7 +87,9 @@
* However, no such adjustments are guaranteed in the face of blocked
* I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
- * synchronization accommodated.
+ * synchronization accommodated. The default policies may be
+ * overridden using a constructor with parameters corresponding to
+ * those documented in class {@link ThreadPoolExecutor}.
*
* <p>In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
@@ -162,7 +170,6 @@
* @since 1.7
* @author Doug Lea
*/
-@jdk.internal.vm.annotation.Contended
public class ForkJoinPool extends AbstractExecutorService {
/*
@@ -229,10 +236,9 @@
* (CAS slot to null))
* increment base and return task;
*
- * There are several variants of each of these; for example most
- * versions of poll pre-screen the CAS by rechecking that the base
- * has not changed since reading the slot, and most methods only
- * attempt the CAS if base appears not to be equal to top.
+ * There are several variants of each of these. In particular,
+ * almost all uses of poll occur within scan operations that also
+ * interleave contention tracking (with associated code sprawl.)
*
* Memory ordering. See "Correct and Efficient Work-Stealing for
* Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
@@ -264,10 +270,7 @@
* thief chooses a different random victim target to try next. So,
* in order for one thief to progress, it suffices for any
* in-progress poll or new push on any empty queue to
- * complete. (This is why we normally use method pollAt and its
- * variants that try once at the apparent base index, else
- * consider alternative actions, rather than method poll, which
- * retries.)
+ * complete.
*
* This approach also enables support of a user mode in which
* local task processing is in FIFO, not LIFO order, simply by
@@ -282,16 +285,13 @@
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters. In essence, submitters act
* like workers except that they are restricted to executing local
- * tasks that they submitted (or in the case of CountedCompleters,
- * others with the same root task). Insertion of tasks in shared
- * mode requires a lock but we use only a simple spinlock (using
- * field qlock), because submitters encountering a busy queue move
- * on to try or create other queues -- they block only when
- * creating and registering new queues. Because it is used only as
- * a spinlock, unlocking requires only a "releasing" store (using
- * putIntRelease). The qlock is also used during termination
- * detection, in which case it is forced to a negative
- * non-lockable value.
+ * tasks that they submitted. Insertion of tasks in shared mode
+ * requires a lock but we use only a simple spinlock (using field
+ * phase), because submitters encountering a busy queue move to a
+ * different position to use or create other queues -- they block
+ * only when creating and registering new queues. Because it is
+ * used only as a spinlock, unlocking requires only a "releasing"
+ * store (using setRelease).
*
* Management
* ==========
@@ -305,42 +305,34 @@
* There are only a few properties that we can globally track or
* maintain, so we pack them into a small number of variables,
* often maintaining atomicity without blocking or locking.
- * Nearly all essentially atomic control state is held in two
+ * Nearly all essentially atomic control state is held in a few
* volatile variables that are by far most often read (not
- * written) as status and consistency checks. (Also, field
- * "config" holds unchanging configuration state.)
+ * written) as status and consistency checks. We pack as much
+ * information into them as we can.
*
* Field "ctl" contains 64 bits holding information needed to
- * atomically decide to add, inactivate, enqueue (on an event
- * queue), dequeue, and/or re-activate workers. To enable this
+ * atomically decide to add, enqueue (on an event queue), and
+ * dequeue (and release)-activate workers. To enable this
* packing, we restrict maximum parallelism to (1<<15)-1 (which is
* far in excess of normal operating range) to allow ids, counts,
* and their negations (used for thresholding) to fit into 16bit
* subfields.
*
- * Field "runState" holds lifetime status, atomically and
- * monotonically setting STARTED, SHUTDOWN, STOP, and finally
- * TERMINATED bits.
- *
- * Field "auxState" is a ReentrantLock subclass that also
- * opportunistically holds some other bookkeeping fields accessed
- * only when locked. It is mainly used to lock (infrequent)
- * updates to workQueues. The auxState instance is itself lazily
- * constructed (see tryInitialize), requiring a double-check-style
- * bootstrapping use of field runState, and locking a private
- * static.
+ * Field "mode" holds configuration parameters as well as lifetime
+ * status, atomically and monotonically setting SHUTDOWN, STOP,
+ * and finally TERMINATED bits.
*
* Field "workQueues" holds references to WorkQueues. It is
- * updated (only during worker creation and termination) under the
- * lock, but is otherwise concurrently readable, and accessed
- * directly. We also ensure that reads of the array reference
- * itself never become too stale (for example, re-reading before
- * each scan). To simplify index-based operations, the array size
- * is always a power of two, and all readers must tolerate null
- * slots. Worker queues are at odd indices. Shared (submission)
- * queues are at even indices, up to a maximum of 64 slots, to
- * limit growth even if array needs to expand to add more
- * workers. Grouping them together in this way simplifies and
+ * updated (only during worker creation and termination) under
+ * lock (using field workerNamePrefix as lock), but is otherwise
+ * concurrently readable, and accessed directly. We also ensure
+ * that uses of the array reference itself never become too stale
+ * in case of resizing. To simplify index-based operations, the
+ * array size is always a power of two, and all readers must
+ * tolerate null slots. Worker queues are at odd indices. Shared
+ * (submission) queues are at even indices, up to a maximum of 64
+ * slots, to limit growth even if array needs to expand to add
+ * more workers. Grouping them together in this way simplifies and
* speeds up task scanning.
*
* All worker thread creation is on-demand, triggered by task
@@ -360,30 +352,37 @@
* workers unless there appear to be tasks available. On the
* other hand, we must quickly prod them into action when new
* tasks are submitted or generated. In many usages, ramp-up time
- * to activate workers is the main limiting factor in overall
- * performance, which is compounded at program start-up by JIT
- * compilation and allocation. So we streamline this as much as
- * possible.
+ * is the main limiting factor in overall performance, which is
+ * compounded at program start-up by JIT compilation and
+ * allocation. So we streamline this as much as possible.
*
- * The "ctl" field atomically maintains active and total worker
- * counts as well as a queue to place waiting threads so they can
- * be located for signalling. Active counts also play the role of
- * quiescence indicators, so are decremented when workers believe
- * that there are no more tasks to execute. The "queue" is
- * actually a form of Treiber stack. A stack is ideal for
- * activating threads in most-recently used order. This improves
+ * The "ctl" field atomically maintains total worker and
+ * "released" worker counts, plus the head of the available worker
+ * queue (actually stack, represented by the lower 32bit subfield
+ * of ctl). Released workers are those known to be scanning for
+ * and/or running tasks. Unreleased ("available") workers are
+ * recorded in the ctl stack. These workers are made available for
+ * signalling by enqueuing in ctl (see method runWorker). The
+ * "queue" is a form of Treiber stack. This is ideal for
+ * activating threads in most-recently used order, and improves
* performance and locality, outweighing the disadvantages of
* being prone to contention and inability to release a worker
- * unless it is topmost on stack. We block/unblock workers after
- * pushing on the idle worker stack (represented by the lower
- * 32bit subfield of ctl) when they cannot find work. The top
- * stack state holds the value of the "scanState" field of the
- * worker: its index and status, plus a version counter that, in
- * addition to the count subfields (also serving as version
- * stamps) provide protection against Treiber stack ABA effects.
+ * unless it is topmost on stack. To avoid missed signal problems
+ * inherent in any wait/signal design, available workers rescan
+ * for (and if found run) tasks after enqueuing. Normally their
+ * release status will be updated while doing so, but the released
+ * worker ctl count may underestimate the number of active
+ * threads. (However, it is still possible to determine quiescence
+ * via a validation traversal -- see isQuiescent). After an
+ * unsuccessful rescan, available workers are blocked until
+ * signalled (see signalWork). The top stack state holds the
+ * value of the "phase" field of the worker: its index and status,
+ * plus a version counter that, in addition to the count subfields
+ * (also serving as version stamps) provide protection against
+ * Treiber stack ABA effects.
*
- * Creating workers. To create a worker, we pre-increment total
- * count (serving as a reservation), and attempt to construct a
+ * Creating workers. To create a worker, we pre-increment counts
+ * (serving as a reservation), and attempt to construct a
* ForkJoinWorkerThread via its factory. Upon construction, the
* new thread invokes registerWorker, where it constructs a
* WorkQueue and is assigned an index in the workQueues array
@@ -405,16 +404,15 @@
* submission queues for existing external threads (see
* externalPush).
*
- * WorkQueue field scanState is used by both workers and the pool
- * to manage and track whether a worker is UNSIGNALLED (possibly
- * blocked waiting for a signal). When a worker is inactivated,
- * its scanState field is set, and is prevented from executing
- * tasks, even though it must scan once for them to avoid queuing
- * races. Note that scanState updates lag queue CAS releases so
- * usage requires care. When queued, the lower 16 bits of
- * scanState must hold its pool index. So we place the index there
- * upon initialization (see registerWorker) and otherwise keep it
- * there or restore it when necessary.
+ * WorkQueue field "phase" is used by both workers and the pool to
+ * manage and track whether a worker is UNSIGNALLED (possibly
+ * blocked waiting for a signal). When a worker is enqueued its
+ * phase field is set. Note that phase field updates lag queue CAS
+ * releases so usage requires care -- seeing a negative phase does
+ * not guarantee that the worker is available. When queued, the
+ * lower 16 bits of scanState must hold its pool index. So we
+ * place the index there upon initialization (see registerWorker)
+ * and otherwise keep it there or restore it when necessary.
*
* The ctl field also serves as the basis for memory
* synchronization surrounding activation. This uses a more
@@ -423,15 +421,14 @@
* if to its current value). This would be extremely costly. So
* we relax it in several ways: (1) Producers only signal when
* their queue is empty. Other workers propagate this signal (in
- * method scan) when they find tasks. (2) Workers only enqueue
- * after scanning (see below) and not finding any tasks. (3)
- * Rather than CASing ctl to its current value in the common case
- * where no action is required, we reduce write contention by
- * equivalently prefacing signalWork when called by an external
- * task producer using a memory access with full-volatile
- * semantics or a "fullFence". (4) For internal task producers we
- * rely on the fact that even if no other workers awaken, the
- * producer itself will eventually see the task and execute it.
+ * method scan) when they find tasks; to further reduce flailing,
+ * each worker signals only one other per activation. (2) Workers
+ * only enqueue after scanning (see below) and not finding any
+ * tasks. (3) Rather than CASing ctl to its current value in the
+ * common case where no action is required, we reduce write
+ * contention by equivalently prefacing signalWork when called by
+ * an external task producer using a memory access with
+ * full-volatile semantics or a "fullFence".
*
* Almost always, too many signals are issued. A task producer
* cannot in general tell if some existing worker is in the midst
@@ -443,64 +440,40 @@
* and bookkeeping bottlenecks during ramp-up, ramp-down, and small
* computations involving only a few workers.
*
- * Scanning. Method scan() performs top-level scanning for tasks.
- * Each scan traverses (and tries to poll from) each queue in
- * pseudorandom permutation order by randomly selecting an origin
- * index and a step value. (The pseudorandom generator need not
- * have high-quality statistical properties in the long term, but
- * just within computations; We use 64bit and 32bit Marsaglia
- * XorShifts, which are cheap and suffice here.) Scanning also
- * employs contention reduction: When scanning workers fail a CAS
- * polling for work, they soon restart with a different
- * pseudorandom scan order (thus likely retrying at different
- * intervals). This improves throughput when many threads are
- * trying to take tasks from few queues. Scans do not otherwise
- * explicitly take into account core affinities, loads, cache
- * localities, etc, However, they do exploit temporal locality
- * (which usually approximates these) by preferring to re-poll (up
- * to POLL_LIMIT times) from the same queue after a successful
- * poll before trying others. Restricted forms of scanning occur
- * in methods helpComplete and findNonEmptyStealQueue, and take
- * similar but simpler forms.
- *
- * Deactivation and waiting. Queuing encounters several intrinsic
- * races; most notably that an inactivating scanning worker can
- * miss seeing a task produced during a scan. So when a worker
- * cannot find a task to steal, it inactivates and enqueues, and
- * then rescans to ensure that it didn't miss one, reactivating
- * upon seeing one with probability approximately proportional to
- * probability of a miss. (In most cases, the worker will be
- * signalled before self-signalling, avoiding cascades of multiple
- * signals for the same task).
- *
- * Workers block (in method awaitWork) using park/unpark;
- * advertising the need for signallers to unpark by setting their
- * "parker" fields.
+ * Scanning. Method runWorker performs top-level scanning for
+ * tasks. Each scan traverses and tries to poll from each queue
+ * starting at a random index and circularly stepping. Scans are
+ * not performed in ideal random permutation order, to reduce
+ * cacheline contention. The pseudorandom generator need not have
+ * high-quality statistical properties in the long term, but just
+ * within computations; We use Marsaglia XorShifts (often via
+ * ThreadLocalRandom.nextSecondarySeed), which are cheap and
+ * suffice. Scanning also employs contention reduction: When
+ * scanning workers fail to extract an apparently existing task,
+ * they soon restart at a different pseudorandom index. This
+ * improves throughput when many threads are trying to take tasks
+ * from few queues, which can be common in some usages. Scans do
+ * not otherwise explicitly take into account core affinities,
+ * loads, cache localities, etc, However, they do exploit temporal
+ * locality (which usually approximates these) by preferring to
+ * re-poll (at most #workers times) from the same queue after a
+ * successful poll before trying others.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
- * time out and terminate (see awaitWork) if the pool has remained
- * quiescent for period given by IDLE_TIMEOUT_MS, increasing the
- * period as the number of threads decreases, eventually removing
- * all workers.
+ * time out and terminate (see method scan) if the pool has
+ * remained quiescent for period given by field keepAlive.
*
* Shutdown and Termination. A call to shutdownNow invokes
* tryTerminate to atomically set a runState bit. The calling
* thread, as well as every other worker thereafter terminating,
- * helps terminate others by setting their (qlock) status,
- * cancelling their unprocessed tasks, and waking them up, doing
- * so repeatedly until stable. Calls to non-abrupt shutdown()
- * preface this by checking whether termination should commence.
- * This relies primarily on the active count bits of "ctl"
- * maintaining consensus -- tryTerminate is called from awaitWork
- * whenever quiescent. However, external submitters do not take
- * part in this consensus. So, tryTerminate sweeps through queues
- * (until stable) to ensure lack of in-flight submissions and
- * workers about to process them before triggering the "STOP"
- * phase of termination. (Note: there is an intrinsic conflict if
- * helpQuiescePool is called when shutdown is enabled. Both wait
- * for quiescence, but tryTerminate is biased to not trigger until
- * helpQuiescePool completes.)
+ * helps terminate others by cancelling their unprocessed tasks,
+ * and waking them up, doing so repeatedly until stable. Calls to
+ * non-abrupt shutdown() preface this by checking whether
+ * termination should commence by sweeping through queues (until
+ * stable) to ensure lack of in-flight submissions and workers
+ * about to process them before triggering the "STOP" phase of
+ * termination.
*
* Joining Tasks
* =============
@@ -508,12 +481,12 @@
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* are multiplexing many tasks on to a pool of workers, we can't
- * just let them block (as in Thread.join). We also cannot just
- * reassign the joiner's run-time stack with another and replace
- * it later, which would be a form of "continuation", that even if
- * possible is not necessarily a good idea since we may need both
- * an unblocked task and its continuation to progress. Instead we
- * combine two tactics:
+ * always just let them block (as in Thread.join). We also cannot
+ * just reassign the joiner's run-time stack with another and
+ * replace it later, which would be a form of "continuation", that
+ * even if possible is not necessarily a good idea since we may
+ * need both an unblocked task and its continuation to progress.
+ * Instead we combine two tactics:
*
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
@@ -526,79 +499,43 @@
* helping a hypothetical compensator: If we can readily tell that
* a possible action of a compensator is to steal and execute the
* task being joined, the joining thread can do so directly,
- * without the need for a compensation thread (although at the
- * expense of larger run-time stacks, but the tradeoff is
- * typically worthwhile).
+ * without the need for a compensation thread.
*
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
*
- * The algorithm in helpStealer entails a form of "linear
- * helping". Each worker records (in field currentSteal) the most
- * recent task it stole from some other worker (or a submission).
- * It also records (in field currentJoin) the task it is currently
- * actively joining. Method helpStealer uses these markers to try
- * to find a worker to help (i.e., steal back a task from and
- * execute it) that could hasten completion of the actively joined
- * task. Thus, the joiner executes a task that would be on its
- * own local deque had the to-be-joined task not been stolen. This
- * is a conservative variant of the approach described in Wagner &
- * Calder "Leapfrogging: a portable technique for implementing
- * efficient futures" SIGPLAN Notices, 1993
- * (http://portal.acm.org/citation.cfm?id=155354). It differs in
- * that: (1) We only maintain dependency links across workers upon
- * steals, rather than use per-task bookkeeping. This sometimes
- * requires a linear scan of workQueues array to locate stealers,
- * but often doesn't because stealers leave hints (that may become
- * stale/wrong) of where to locate them. It is only a hint
- * because a worker might have had multiple steals and the hint
- * records only one of them (usually the most current). Hinting
- * isolates cost to when it is needed, rather than adding to
- * per-task overhead. (2) It is "shallow", ignoring nesting and
- * potentially cyclic mutual steals. (3) It is intentionally
- * racy: field currentJoin is updated only while actively joining,
- * which means that we miss links in the chain during long-lived
- * tasks, GC stalls etc (which is OK since blocking in such cases
- * is usually a good idea). (4) We bound the number of attempts
- * to find work using checksums and fall back to suspending the
- * worker and if necessary replacing it with another.
+ * The algorithm in awaitJoin entails a form of "linear helping".
+ * Each worker records (in field source) the id of the queue from
+ * which it last stole a task. The scan in method awaitJoin uses
+ * these markers to try to find a worker to help (i.e., steal back
+ * a task from and execute it) that could hasten completion of the
+ * actively joined task. Thus, the joiner executes a task that
+ * would be on its own local deque if the to-be-joined task had
+ * not been stolen. This is a conservative variant of the approach
+ * described in Wagner & Calder "Leapfrogging: a portable
+ * technique for implementing efficient futures" SIGPLAN Notices,
+ * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
+ * mainly in that we only record queue ids, not full dependency
+ * links. This requires a linear scan of the workQueues array to
+ * locate stealers, but isolates cost to when it is needed, rather
+ * than adding to per-task overhead. Searches can fail to locate
+ * stealers GC stalls and the like delay recording sources.
+ * Further, even when accurately identified, stealers might not
+ * ever produce a task that the joiner can in turn help with. So,
+ * compensation is tried upon failure to find tasks to run.
*
- * Helping actions for CountedCompleters do not require tracking
- * currentJoins: Method helpComplete takes and executes any task
- * with the same root as the task being waited on (preferring
- * local pops to non-local polls). However, this still entails
- * some traversal of completer chains, so is less efficient than
- * using CountedCompleters without explicit joins.
- *
- * Compensation does not aim to keep exactly the target
+ * Compensation does not by default aim to keep exactly the target
* parallelism number of unblocked threads running at any given
* time. Some previous versions of this class employed immediate
* compensations for any blocked join. However, in practice, the
* vast majority of blockages are transient byproducts of GC and
* other JVM or OS activities that are made worse by replacement.
- * Currently, compensation is attempted only after validating that
- * all purportedly active threads are processing tasks by checking
- * field WorkQueue.scanState, which eliminates most false
- * positives. Also, compensation is bypassed (tolerating fewer
- * threads) in the most common case in which it is rarely
- * beneficial: when a worker with an empty queue (thus no
- * continuation tasks) blocks on a join and there still remain
- * enough threads to ensure liveness.
- *
- * Spare threads are removed as soon as they notice that the
- * target parallelism level has been exceeded, in method
- * tryDropSpare. (Method scan arranges returns for rechecks upon
- * each probe via the "bound" parameter.)
- *
- * The compensation mechanism may be bounded. Bounds for the
- * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
- * with programming errors and abuse before running out of
- * resources to do so. In other cases, users may supply factories
- * that limit thread construction. The effects of bounding in this
- * pool (like all others) is imprecise. Total worker counts are
- * decremented when threads deregister, not when they exit and
- * resources are reclaimed by the JVM and OS. So the number of
- * simultaneously live threads may transiently exceed bounds.
+ * Rather than impose arbitrary policies, we allow users to
+ * override the default of only adding threads upon apparent
+ * starvation. The compensation mechanism may also be bounded.
+ * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
+ * JVMs to cope with programming errors and abuse before running
+ * out of resources to do so.
*
* Common Pool
* ===========
@@ -606,9 +543,7 @@
* The static common pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
- * footprint to the setup of about a dozen fields, with no nested
- * allocation. Most bootstrapping occurs within method
- * externalSubmit during the first submission to the pool.
+ * footprint to the setup of about a dozen fields.
*
* When external threads submit to the common pool, they can
* perform subtask processing (see externalHelpComplete and
@@ -628,28 +563,22 @@
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
- * after executing any top-level task (see WorkQueue.runTask).
- * The associated mechanics (mainly in ForkJoinWorkerThread) may
- * be JVM-dependent and must access particular Thread class fields
- * to achieve this effect.
+ * after executing any top-level task (see
+ * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
+ * in ForkJoinWorkerThread) may be JVM-dependent and must access
+ * particular Thread class fields to achieve this effect.
*
* Style notes
* ===========
*
- * Memory ordering relies mainly on Unsafe intrinsics that carry
- * the further responsibility of explicitly performing null- and
- * bounds- checks otherwise carried out implicitly by JVMs. This
- * can be awkward and ugly, but also reflects the need to control
+ * Memory ordering relies mainly on VarHandles. This can be
+ * awkward and ugly, but also reflects the need to control
* outcomes across the unusual cases that arise in very racy code
- * with very few invariants. So these explicit checks would exist
- * in some form anyway. All fields are read into locals before
- * use, and null-checked if they are references. This is usually
- * done in a "C"-like style of listing declarations at the heads
- * of methods or blocks, and using inline assignments on first
- * encounter. Array bounds-checks are usually performed by
- * masking with array.length-1, which relies on the invariant that
- * these arrays are created with positive lengths, which is itself
- * paranoically checked. Nearly all explicit checks lead to
+ * with very few invariants. All fields are read into locals
+ * before use, and null-checked if they are references. This is
+ * usually done in a "C"-like style of listing declarations at the
+ * heads of methods or blocks, and using inline assignments on
+ * first encounter. Nearly all explicit checks lead to
* bypass/return, not exception throws, because they may
* legitimately arise due to cancellation/revocation during
* shutdown.
@@ -701,10 +630,17 @@
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
+ * Returning null or throwing an exception may result in tasks
+ * never being executed. If this method throws an exception,
+ * it is relayed to the caller of the method (for example
+ * {@code execute}) causing attempted thread creation. If this
+ * method returns null or throws an exception, it is not
+ * retried until the next attempted creation (for example
+ * another call to {@code execute}).
*
* @param pool the pool this thread works in
* @return the new worker thread, or {@code null} if the request
- * to create a thread is rejected
+ * to create a thread is rejected.
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
@@ -721,56 +657,35 @@
}
}
- /**
- * Class for artificial tasks that are used to replace the target
- * of local joins if they are removed from an interior queue slot
- * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
- * actually do anything beyond having a unique identity.
- */
- private static final class EmptyTask extends ForkJoinTask<Void> {
- private static final long serialVersionUID = -7721805057305804111L;
- EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
- public final Void getRawResult() { return null; }
- public final void setRawResult(Void x) {}
- public final boolean exec() { return true; }
- }
-
- /**
- * Additional fields and lock created upon initialization.
- */
- private static final class AuxState extends ReentrantLock {
- private static final long serialVersionUID = -6001602636862214147L;
- volatile long stealCount; // cumulative steal count
- long indexSeed; // index bits for registerWorker
- AuxState() {}
- }
-
// Constants shared across ForkJoinPool and WorkQueue
// Bounds
+ static final int SWIDTH = 16; // width of short
static final int SMASK = 0xffff; // short bits == max index
static final int MAX_CAP = 0x7fff; // max #workers - 1
- static final int EVENMASK = 0xfffe; // even short bits
static final int SQMASK = 0x007e; // max 64 (even) slots
- // Masks and units for WorkQueue.scanState and ctl sp subfield
+ // Masks and units for WorkQueue.phase and ctl sp subfield
static final int UNSIGNALLED = 1 << 31; // must be negative
static final int SS_SEQ = 1 << 16; // version count
+ static final int QLOCK = 1; // must be 1
- // Mode bits for ForkJoinPool.config and WorkQueue.config
- static final int MODE_MASK = 0xffff << 16; // top half of int
- static final int SPARE_WORKER = 1 << 17; // set if tc > 0 on creation
- static final int UNREGISTERED = 1 << 18; // to skip some of deregister
- static final int FIFO_QUEUE = 1 << 31; // must be negative
- static final int LIFO_QUEUE = 0; // for clarity
- static final int IS_OWNED = 1; // low bit 0 if shared
+ // Mode bits and sentinels, some also used in WorkQueue id and.source fields
+ static final int OWNED = 1; // queue has owner thread
+ static final int FIFO = 1 << 16; // fifo queue or access mode
+ static final int SHUTDOWN = 1 << 18;
+ static final int TERMINATED = 1 << 19;
+ static final int STOP = 1 << 31; // must be negative
+ static final int QUIET = 1 << 30; // not scanning or working
+ static final int DORMANT = QUIET | UNSIGNALLED;
/**
- * The maximum number of task executions from the same queue
- * before checking other queues, bounding unfairness and impact of
- * infinite user task recursion. Must be a power of two minus 1.
+ * The maximum number of local polls from the same queue before
+ * checking others. This is a safeguard against infinitely unfair
+ * looping under unbounded user task recursion, and must be larger
+ * than plausible cases of intentional bounded task recursion.
*/
- static final int POLL_LIMIT = (1 << 10) - 1;
+ static final int POLL_LIMIT = 1 << 10;
/**
* Queues supporting work-stealing as well as external task
@@ -805,23 +720,16 @@
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
-
- volatile int scanState; // versioned, negative if inactive
- int stackPred; // pool stack (ctl) predecessor
+ volatile int phase; // versioned, negative: queued, 1: locked
+ int stackPred; // pool stack (ctl) predecessor link
int nsteals; // number of steals
- int hint; // randomization and stealer index hint
- int config; // pool index and mode
- volatile int qlock; // 1: locked, < 0: terminate; else 0
+ int id; // index, mode, tag
+ volatile int source; // source queue id, or sentinel
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
- volatile Thread parker; // == owner during call to park; else null
- volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
-
- @jdk.internal.vm.annotation.Contended("group2") // segregate
- volatile ForkJoinTask<?> currentSteal; // nonnull when running some task
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
@@ -834,7 +742,7 @@
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
final int getPoolIndex() {
- return (config & 0xffff) >>> 1; // ignore odd/even tag bit
+ return (id & 0xffff) >>> 1; // ignore odd/even tag bit
}
/**
@@ -851,13 +759,14 @@
* near-empty queue has at least one unclaimed task.
*/
final boolean isEmpty() {
- ForkJoinTask<?>[] a; int n, al, s;
- return ((n = base - (s = top)) >= 0 || // possibly one task
+ ForkJoinTask<?>[] a; int n, al, b;
+ return ((n = (b = base) - top) >= 0 || // possibly one task
(n == -1 && ((a = array) == null ||
(al = a.length) == 0 ||
- a[(al - 1) & (s - 1)] == null)));
+ a[(al - 1) & b] == null)));
}
+
/**
* Pushes a task. Call only by owner in unshared queues.
*
@@ -865,17 +774,17 @@
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
- U.storeFence(); // ensure safe publication
- int s = top, al, d; ForkJoinTask<?>[] a;
+ int s = top; ForkJoinTask<?>[] a; int al, d;
if ((a = array) != null && (al = a.length) > 0) {
- a[(al - 1) & s] = task; // relaxed writes OK
- top = s + 1;
+ int index = (al - 1) & s;
ForkJoinPool p = pool;
+ top = s + 1;
+ QA.setRelease(a, index, task);
if ((d = base - s) == 0 && p != null) {
- U.fullFence();
+ VarHandle.fullFence();
p.signalWork();
}
- else if (al + d == 1)
+ else if (d + al == 1)
growArray();
}
}
@@ -887,24 +796,24 @@
*/
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;
- int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
+ int oldSize = oldA != null ? oldA.length : 0;
+ int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
- if (oldA != null && (oldMask = oldA.length - 1) > 0 &&
+ if (oldA != null && (oldMask = oldSize - 1) > 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
do { // emulate poll from old array, push to new array
int index = b & oldMask;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> x = (ForkJoinTask<?>)
- U.getObjectVolatile(oldA, offset);
+ QA.getAcquire(oldA, index);
if (x != null &&
- U.compareAndSwapObject(oldA, offset, x, null))
+ QA.compareAndSet(oldA, index, x, null))
a[b & mask] = x;
} while (++b != t);
- U.storeFence();
+ VarHandle.releaseFence();
}
return a;
}
@@ -917,33 +826,12 @@
int b = base, s = top, al, i; ForkJoinTask<?>[] a;
if ((a = array) != null && b != s && (al = a.length) > 0) {
int index = (al - 1) & --s;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObject(a, offset);
+ QA.get(a, index);
if (t != null &&
- U.compareAndSwapObject(a, offset, t, null)) {
+ QA.compareAndSet(a, index, t, null)) {
top = s;
- return t;
- }
- }
- return null;
- }
-
- /**
- * Takes a task in FIFO order if b is base of queue and a task
- * can be claimed without contention. Specialized versions
- * appear in ForkJoinPool methods scan and helpStealer.
- */
- final ForkJoinTask<?> pollAt(int b) {
- ForkJoinTask<?>[] a; int al;
- if ((a = array) != null && (al = a.length) > 0) {
- int index = (al - 1) & b;
- long offset = ((long)index << ASHIFT) + ABASE;
- ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (t != null && b++ == base &&
- U.compareAndSwapObject(a, offset, t, null)) {
- base = b;
+ VarHandle.releaseFence();
return t;
}
}
@@ -959,12 +847,11 @@
if ((a = array) != null && (d = b - s) < 0 &&
(al = a.length) > 0) {
int index = (al - 1) & b;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
+ QA.getAcquire(a, index);
if (b++ == base) {
if (t != null) {
- if (U.compareAndSwapObject(a, offset, t, null)) {
+ if (QA.compareAndSet(a, index, t, null)) {
base = b;
return t;
}
@@ -983,7 +870,7 @@
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
- return (config < 0) ? poll() : pop();
+ return ((id & FIFO) != 0) ? poll() : pop();
}
/**
@@ -992,7 +879,8 @@
final ForkJoinTask<?> peek() {
int al; ForkJoinTask<?>[] a;
return ((a = array) != null && (al = a.length) > 0) ?
- a[(al - 1) & (config < 0 ? base : top - 1)] : null;
+ a[(al - 1) &
+ ((id & FIFO) != 0 ? base : top - 1)] : null;
}
/**
@@ -1002,9 +890,9 @@
int b = base, s = top, al; ForkJoinTask<?>[] a;
if ((a = array) != null && b != s && (al = a.length) > 0) {
int index = (al - 1) & --s;
- long offset = ((long)index << ASHIFT) + ABASE;
- if (U.compareAndSwapObject(a, offset, task, null)) {
+ if (QA.compareAndSet(a, index, task, null)) {
top = s;
+ VarHandle.releaseFence();
return true;
}
}
@@ -1012,105 +900,32 @@
}
/**
- * Shared version of push. Fails if already locked.
- *
- * @return status: > 0 locked, 0 possibly was empty, < 0 was nonempty
- */
- final int sharedPush(ForkJoinTask<?> task) {
- int stat;
- if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
- int b = base, s = top, al, d; ForkJoinTask<?>[] a;
- if ((a = array) != null && (al = a.length) > 0 &&
- al - 1 + (d = b - s) > 0) {
- a[(al - 1) & s] = task;
- top = s + 1; // relaxed writes OK here
- qlock = 0;
- stat = (d < 0 && b == base) ? d : 0;
- }
- else {
- growAndSharedPush(task);
- stat = 0;
- }
- }
- else
- stat = 1;
- return stat;
- }
-
- /**
- * Helper for sharedPush; called only when locked and resize
- * needed.
- */
- private void growAndSharedPush(ForkJoinTask<?> task) {
- try {
- growArray();
- int s = top, al; ForkJoinTask<?>[] a;
- if ((a = array) != null && (al = a.length) > 0) {
- a[(al - 1) & s] = task;
- top = s + 1;
- }
- } finally {
- qlock = 0;
- }
- }
-
- /**
- * Shared version of tryUnpush.
- */
- final boolean trySharedUnpush(ForkJoinTask<?> task) {
- boolean popped = false;
- int s = top - 1, al; ForkJoinTask<?>[] a;
- if ((a = array) != null && (al = a.length) > 0) {
- int index = (al - 1) & s;
- long offset = ((long)index << ASHIFT) + ABASE;
- ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
- if (t == task &&
- U.compareAndSwapInt(this, QLOCK, 0, 1)) {
- if (top == s + 1 && array == a &&
- U.compareAndSwapObject(a, offset, task, null)) {
- popped = true;
- top = s;
- }
- U.putIntRelease(this, QLOCK, 0);
- }
- }
- return popped;
- }
-
- /**
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
- ForkJoinTask<?> t;
- if ((t = currentJoin) != null) {
- currentJoin = null;
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
- if ((t = currentSteal) != null) {
- currentSteal = null;
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
- while ((t = poll()) != null)
+ for (ForkJoinTask<?> t; (t = poll()) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
}
// Specialized execution methods
/**
- * Pops and executes up to POLL_LIMIT tasks or until empty.
+ * Pops and executes up to limit consecutive tasks or until empty.
+ *
+ * @param limit max runs, or zero for no limit
*/
- final void localPopAndExec() {
- for (int nexec = 0;;) {
+ final void localPopAndExec(int limit) {
+ for (;;) {
int b = base, s = top, al; ForkJoinTask<?>[] a;
if ((a = array) != null && b != s && (al = a.length) > 0) {
int index = (al - 1) & --s;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getAndSetObject(a, offset, null);
+ QA.getAndSet(a, index, null);
if (t != null) {
top = s;
- (currentSteal = t).doExec();
- if (++nexec > POLL_LIMIT)
+ VarHandle.releaseFence();
+ t.doExec();
+ if (limit != 0 && --limit == 0)
break;
}
else
@@ -1122,22 +937,28 @@
}
/**
- * Polls and executes up to POLL_LIMIT tasks or until empty.
+ * Polls and executes up to limit consecutive tasks or until empty.
+ *
+ * @param limit, or zero for no limit
*/
- final void localPollAndExec() {
- for (int nexec = 0;;) {
- int b = base, s = top, al; ForkJoinTask<?>[] a;
- if ((a = array) != null && b != s && (al = a.length) > 0) {
+ final void localPollAndExec(int limit) {
+ for (int polls = 0;;) {
+ int b = base, s = top, d, al; ForkJoinTask<?>[] a;
+ if ((a = array) != null && (d = b - s) < 0 &&
+ (al = a.length) > 0) {
int index = (al - 1) & b++;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getAndSetObject(a, offset, null);
+ QA.getAndSet(a, index, null);
if (t != null) {
base = b;
t.doExec();
- if (++nexec > POLL_LIMIT)
+ if (limit != 0 && ++polls == limit)
break;
}
+ else if (d == -1)
+ break; // now empty
+ else
+ polls = 0; // stolen; reset
}
else
break;
@@ -1145,188 +966,156 @@
}
/**
- * Executes the given task and (some) remaining local tasks.
+ * If present, removes task from queue and executes it.
*/
- final void runTask(ForkJoinTask<?> task) {
- if (task != null) {
- task.doExec();
- if (config < 0)
- localPollAndExec();
- else
- localPopAndExec();
- int ns = ++nsteals;
- ForkJoinWorkerThread thread = owner;
- currentSteal = null;
- if (ns < 0) // collect on overflow
- transferStealCount(pool);
- if (thread != null)
- thread.afterTopLevelExec();
- }
- }
-
- /**
- * Adds steal count to pool steal count if it exists, and resets.
- */
- final void transferStealCount(ForkJoinPool p) {
- AuxState aux;
- if (p != null && (aux = p.auxState) != null) {
- long s = nsteals;
- nsteals = 0; // if negative, correct for overflow
- if (s < 0) s = Integer.MAX_VALUE;
- aux.lock();
- try {
- aux.stealCount += s;
- } finally {
- aux.unlock();
+ final void tryRemoveAndExec(ForkJoinTask<?> task) {
+ ForkJoinTask<?>[] wa; int s, wal;
+ if (base - (s = top) < 0 && // traverse from top
+ (wa = array) != null && (wal = wa.length) > 0) {
+ for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
+ int index = i & m;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ QA.get(wa, index);
+ if (t == null)
+ break;
+ else if (t == task) {
+ if (QA.compareAndSet(wa, index, t, null)) {
+ top = ns; // safely shift down
+ for (int j = i; j != ns; ++j) {
+ ForkJoinTask<?> f;
+ int pindex = (j + 1) & m;
+ f = (ForkJoinTask<?>)QA.get(wa, pindex);
+ QA.setVolatile(wa, pindex, null);
+ int jindex = j & m;
+ QA.setRelease(wa, jindex, f);
+ }
+ VarHandle.releaseFence();
+ t.doExec();
+ }
+ break;
+ }
}
}
}
/**
- * If present, removes from queue and executes the given task,
- * or any other cancelled task. Used only by awaitJoin.
+ * Tries to steal and run tasks within the target's
+ * computation until done, not found, or limit exceeded.
*
- * @return true if queue empty and task not known to be done
+ * @param task root of CountedCompleter computation
+ * @param limit max runs, or zero for no limit
+ * @return task status on exit
*/
- final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
- if (task != null && task.status >= 0) {
- int b, s, d, al; ForkJoinTask<?>[] a;
- while ((d = (b = base) - (s = top)) < 0 &&
- (a = array) != null && (al = a.length) > 0) {
- for (;;) { // traverse from s to b
- int index = --s & (al - 1);
- long offset = (index << ASHIFT) + ABASE;
- ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (t == null)
- break; // restart
- else if (t == task) {
- boolean removed = false;
- if (s + 1 == top) { // pop
- if (U.compareAndSwapObject(a, offset, t, null)) {
- top = s;
- removed = true;
+ final int localHelpCC(CountedCompleter<?> task, int limit) {
+ int status = 0;
+ if (task != null && (status = task.status) >= 0) {
+ for (;;) {
+ boolean help = false;
+ int b = base, s = top, al; ForkJoinTask<?>[] a;
+ if ((a = array) != null && b != s && (al = a.length) > 0) {
+ int index = (al - 1) & (s - 1);
+ ForkJoinTask<?> o = (ForkJoinTask<?>)
+ QA.get(a, index);
+ if (o instanceof CountedCompleter) {
+ CountedCompleter<?> t = (CountedCompleter<?>)o;
+ for (CountedCompleter<?> f = t;;) {
+ if (f != task) {
+ if ((f = f.completer) == null) // try parent
+ break;
+ }
+ else {
+ if (QA.compareAndSet(a, index, t, null)) {
+ top = s - 1;
+ VarHandle.releaseFence();
+ t.doExec();
+ help = true;
+ }
+ break;
}
}
- else if (base == b) // replace with proxy
- removed = U.compareAndSwapObject(a, offset, t,
- new EmptyTask());
- if (removed) {
- ForkJoinTask<?> ps = currentSteal;
- (currentSteal = task).doExec();
- currentSteal = ps;
- }
- break;
- }
- else if (t.status < 0 && s + 1 == top) {
- if (U.compareAndSwapObject(a, offset, t, null)) {
- top = s;
- }
- break; // was cancelled
- }
- else if (++d == 0) {
- if (base != b) // rescan
- break;
- return false;
}
}
- if (task.status < 0)
- return false;
+ if ((status = task.status) < 0 || !help ||
+ (limit != 0 && --limit == 0))
+ break;
}
}
- return true;
+ return status;
+ }
+
+ // Operations on shared queues
+
+ /**
+ * Tries to lock shared queue by CASing phase field.
+ */
+ final boolean tryLockSharedQueue() {
+ return PHASE.compareAndSet(this, 0, QLOCK);
}
/**
- * Pops task if in the same CC computation as the given task,
- * in either shared or owned mode. Used only by helpComplete.
+ * Shared version of tryUnpush.
*/
- final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
- int b = base, s = top, al; ForkJoinTask<?>[] a;
- if ((a = array) != null && b != s && (al = a.length) > 0) {
- int index = (al - 1) & (s - 1);
- long offset = ((long)index << ASHIFT) + ABASE;
- ForkJoinTask<?> o = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (o instanceof CountedCompleter) {
- CountedCompleter<?> t = (CountedCompleter<?>)o;
- for (CountedCompleter<?> r = t;;) {
- if (r == task) {
- if ((mode & IS_OWNED) == 0) {
- boolean popped = false;
- if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
- if (top == s && array == a &&
- U.compareAndSwapObject(a, offset,
- t, null)) {
- popped = true;
- top = s - 1;
- }
- U.putIntRelease(this, QLOCK, 0);
- if (popped)
- return t;
- }
- }
- else if (U.compareAndSwapObject(a, offset,
- t, null)) {
- top = s - 1;
- return t;
- }
- break;
- }
- else if ((r = r.completer) == null) // try parent
- break;
+ final boolean trySharedUnpush(ForkJoinTask<?> task) {
+ boolean popped = false;
+ int s = top - 1, al; ForkJoinTask<?>[] a;
+ if ((a = array) != null && (al = a.length) > 0) {
+ int index = (al - 1) & s;
+ ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
+ if (t == task &&
+ PHASE.compareAndSet(this, 0, QLOCK)) {
+ if (top == s + 1 && array == a &&
+ QA.compareAndSet(a, index, task, null)) {
+ popped = true;
+ top = s;
}
+ PHASE.setRelease(this, 0);
}
}
- return null;
+ return popped;
}
/**
- * Steals and runs a task in the same CC computation as the
- * given task if one exists and can be taken without
- * contention. Otherwise returns a checksum/control value for
- * use by method helpComplete.
- *
- * @return 1 if successful, 2 if retryable (lost to another
- * stealer), -1 if non-empty but no matching task found, else
- * the base index, forced negative.
+ * Shared version of localHelpCC.
*/
- final int pollAndExecCC(CountedCompleter<?> task) {
- ForkJoinTask<?>[] a;
- int b = base, s = top, al, h;
- if ((a = array) != null && b != s && (al = a.length) > 0) {
- int index = (al - 1) & b;
- long offset = ((long)index << ASHIFT) + ABASE;
- ForkJoinTask<?> o = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (o == null)
- h = 2; // retryable
- else if (!(o instanceof CountedCompleter))
- h = -1; // unmatchable
- else {
- CountedCompleter<?> t = (CountedCompleter<?>)o;
- for (CountedCompleter<?> r = t;;) {
- if (r == task) {
- if (b++ == base &&
- U.compareAndSwapObject(a, offset, t, null)) {
- base = b;
- t.doExec();
- h = 1; // success
+ final int sharedHelpCC(CountedCompleter<?> task, int limit) {
+ int status = 0;
+ if (task != null && (status = task.status) >= 0) {
+ for (;;) {
+ boolean help = false;
+ int b = base, s = top, al; ForkJoinTask<?>[] a;
+ if ((a = array) != null && b != s && (al = a.length) > 0) {
+ int index = (al - 1) & (s - 1);
+ ForkJoinTask<?> o = (ForkJoinTask<?>)
+ QA.get(a, index);
+ if (o instanceof CountedCompleter) {
+ CountedCompleter<?> t = (CountedCompleter<?>)o;
+ for (CountedCompleter<?> f = t;;) {
+ if (f != task) {
+ if ((f = f.completer) == null)
+ break;
+ }
+ else {
+ if (PHASE.compareAndSet(this, 0, QLOCK)) {
+ if (top == s && array == a &&
+ QA.compareAndSet(a, index, t, null)) {
+ help = true;
+ top = s - 1;
+ }
+ PHASE.setRelease(this, 0);
+ if (help)
+ t.doExec();
+ }
+ break;
+ }
}
- else
- h = 2; // lost CAS
- break;
- }
- else if ((r = r.completer) == null) {
- h = -1; // unmatched
- break;
}
}
+ if ((status = task.status) < 0 || !help ||
+ (limit != 0 && --limit == 0))
+ break;
}
}
- else
- h = b | Integer.MIN_VALUE; // to sense movement on re-poll
- return h;
+ return status;
}
/**
@@ -1334,27 +1123,18 @@
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
- return (scanState >= 0 &&
- (wt = owner) != null &&
+ return ((wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED &&
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
- // Unsafe mechanics. Note that some are (and must be) the same as in FJP
- private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
- private static final long QLOCK;
- private static final int ABASE;
- private static final int ASHIFT;
+ // VarHandle mechanics.
+ private static final VarHandle PHASE;
static {
try {
- QLOCK = U.objectFieldOffset
- (WorkQueue.class.getDeclaredField("qlock"));
- ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
- int scale = U.arrayIndexScale(ForkJoinTask[].class);
- if ((scale & (scale - 1)) != 0)
- throw new Error("array index scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
+ MethodHandles.Lookup l = MethodHandles.lookup();
+ PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
@@ -1372,7 +1152,7 @@
/**
* Permission required for callers of methods that may start or
- * kill threads. Also used as a static lock in tryInitialize.
+ * kill threads.
*/
static final RuntimePermission modifyThreadPermission;
@@ -1413,18 +1193,15 @@
// static configuration constants
/**
- * Initial timeout value (in milliseconds) for the thread
- * triggering quiescence to park waiting for new work. On timeout,
- * the thread will instead try to shrink the number of workers.
- * The value should be large enough to avoid overly aggressive
- * shrinkage during most transient stalls (long GCs etc).
+ * Default idle timeout value (in milliseconds) for the thread
+ * triggering quiescence to park waiting for new work
*/
- private static final long IDLE_TIMEOUT_MS = 2000L; // 2sec
+ private static final long DEFAULT_KEEPALIVE = 60000L;
/**
- * Tolerance for idle timeouts, to cope with timer undershoots.
+ * Undershoot tolerance for idle timeouts
*/
- private static final long TIMEOUT_SLOP_MS = 20L; // 20ms
+ private static final long TIMEOUT_SLOP = 20L;
/**
* The default value for COMMON_MAX_SPARES. Overridable using the
@@ -1444,7 +1221,7 @@
/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
- * AC: Number of active running workers minus target parallelism
+ * RC: Number of released (unqueued) workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
@@ -1453,26 +1230,30 @@
* (including version bits) as sp=(int)ctl. The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
- * fields: When ac is negative, there are not enough active
+ * fields: When ac is negative, there are not enough unqueued
* workers, when tc is negative, there are not enough total
* workers. When sp is non-zero, there are waiting workers. To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
- * Because it occupies uppermost bits, we can add one active count
- * using getAndAddLong of AC_UNIT, rather than CAS, when returning
+ * Because it occupies uppermost bits, we can add one release count
+ * using getAndAddLong of RC_UNIT, rather than CAS, when returning
* from a blocked join. Other updates entail multiple subfields
* and masking, requiring CAS.
+ *
+ * The limits packed in field "bounds" are also offset by the
+ * parallelism level to make them comparable to the ctl rc and tc
+ * fields.
*/
// Lower and upper word masks
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
- // Active counts
- private static final int AC_SHIFT = 48;
- private static final long AC_UNIT = 0x0001L << AC_SHIFT;
- private static final long AC_MASK = 0xffffL << AC_SHIFT;
+ // Release counts
+ private static final int RC_SHIFT = 48;
+ private static final long RC_UNIT = 0x0001L << RC_SHIFT;
+ private static final long RC_MASK = 0xffffL << RC_SHIFT;
// Total counts
private static final int TC_SHIFT = 32;
@@ -1480,52 +1261,21 @@
private static final long TC_MASK = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
- // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
- private static final int STARTED = 1;
- private static final int STOP = 1 << 1;
- private static final int TERMINATED = 1 << 2;
- private static final int SHUTDOWN = 1 << 31;
+ // Instance fields
- // Instance fields
- volatile long ctl; // main pool control
- volatile int runState;
- final int config; // parallelism, mode
- AuxState auxState; // lock, steal counts
- volatile WorkQueue[] workQueues; // main registry
- final String workerNamePrefix; // to create worker name string
+ volatile long stealCount; // collects worker nsteals
+ final long keepAlive; // milliseconds before dropping if idle
+ int indexSeed; // next worker index
+ final int bounds; // min, max threads packed as shorts
+ volatile int mode; // parallelism, runstate, queue mode
+ WorkQueue[] workQueues; // main registry
+ final String workerNamePrefix; // for worker thread string; sync lock
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
+ final Predicate<? super ForkJoinPool> saturate;
- /**
- * Instantiates fields upon first submission, or upon shutdown if
- * no submissions. If checkTermination true, also responds to
- * termination by external calls submitting tasks.
- */
- private void tryInitialize(boolean checkTermination) {
- if (runState == 0) { // bootstrap by locking static field
- int p = config & SMASK;
- int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
- n |= n >>> 1; // create workQueues array with size a power of two
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n = ((n + 1) << 1) & SMASK;
- AuxState aux = new AuxState();
- WorkQueue[] ws = new WorkQueue[n];
- synchronized (modifyThreadPermission) { // double-check
- if (runState == 0) {
- workQueues = ws;
- auxState = aux;
- runState = STARTED;
- }
- }
- }
- if (checkTermination && runState < 0) {
- tryTerminate(false, false); // help terminate
- throw new RejectedExecutionException();
- }
- }
+ @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
+ volatile long ctl; // main pool control
// Creating, registering and deregistering workers
@@ -1534,18 +1284,14 @@
* count has already been incremented as a reservation. Invokes
* deregisterWorker on any failure.
*
- * @param isSpare true if this is a spare thread
* @return true if successful
*/
- private boolean createWorker(boolean isSpare) {
+ private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
- WorkQueue q;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
- if (isSpare && (q = wt.workQueue) != null)
- q.config |= SPARE_WORKER;
wt.start();
return true;
}
@@ -1566,10 +1312,10 @@
*/
private void tryAddWorker(long c) {
do {
- long nc = ((AC_MASK & (c + AC_UNIT)) |
+ long nc = ((RC_MASK & (c + RC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
- if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
- createWorker(false);
+ if (ctl == c && CTL.compareAndSet(this, c, nc)) {
+ createWorker();
break;
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
@@ -1584,41 +1330,57 @@
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
- AuxState aux;
- wt.setDaemon(true); // configure thread
+ wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
- int i = 0; // assign a pool index
- int mode = config & MODE_MASK;
- if ((aux = auxState) != null) {
- aux.lock();
- try {
- int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m;
- WorkQueue[] ws = workQueues;
- if (ws != null && (n = ws.length) > 0) {
- i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices
- if (ws[i] != null) { // collision
- int probes = 0; // step by approx half n
- int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
- while (ws[i = (i + step) & m] != null) {
- if (++probes >= n) {
- workQueues = ws = Arrays.copyOf(ws, n <<= 1);
- m = n - 1;
- probes = 0;
- }
+ int tid = 0; // for thread name
+ int fifo = mode & FIFO;
+ String prefix = workerNamePrefix;
+ if (prefix != null) {
+ synchronized (prefix) {
+ WorkQueue[] ws = workQueues; int n;
+ int s = indexSeed += SEED_INCREMENT;
+ if (ws != null && (n = ws.length) > 1) {
+ int m = n - 1;
+ tid = s & m;
+ int i = m & ((s << 1) | 1); // odd-numbered indices
+ for (int probes = n >>> 1;;) { // find empty slot
+ WorkQueue q;
+ if ((q = ws[i]) == null || q.phase == QUIET)
+ break;
+ else if (--probes == 0) {
+ i = n | 1; // resize below
+ break;
}
+ else
+ i = (i + 2) & m;
}
- w.hint = s; // use as random seed
- w.config = i | mode;
- w.scanState = i | (s & 0x7fff0000); // random seq bits
- ws[i] = w;
+
+ int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
+ w.phase = w.id = id; // now publishable
+
+ if (i < n)
+ ws[i] = w;
+ else { // expand array
+ int an = n << 1;
+ WorkQueue[] as = new WorkQueue[an];
+ as[i] = w;
+ int am = an - 1;
+ for (int j = 0; j < n; ++j) {
+ WorkQueue v; // copy external queue
+ if ((v = ws[j]) != null) // position may change
+ as[v.id & am & SQMASK] = v;
+ if (++j >= n)
+ break;
+ as[j] = ws[j]; // copy worker
+ }
+ workQueues = as;
+ }
}
- } finally {
- aux.unlock();
}
+ wt.setName(prefix.concat(Integer.toString(tid)));
}
- wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
@@ -1633,64 +1395,48 @@
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
+ int phase = 0;
if (wt != null && (w = wt.workQueue) != null) {
- AuxState aux; WorkQueue[] ws; // remove index from array
- int idx = w.config & SMASK;
- int ns = w.nsteals;
- if ((aux = auxState) != null) {
- aux.lock();
- try {
+ Object lock = workerNamePrefix;
+ long ns = (long)w.nsteals & 0xffffffffL;
+ int idx = w.id & SMASK;
+ if (lock != null) {
+ WorkQueue[] ws; // remove index from array
+ synchronized (lock) {
if ((ws = workQueues) != null && ws.length > idx &&
ws[idx] == w)
ws[idx] = null;
- aux.stealCount += ns;
- } finally {
- aux.unlock();
+ stealCount += ns;
}
}
- }
- if (w == null || (w.config & UNREGISTERED) == 0) { // else pre-adjusted
- long c; // decrement counts
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
- (TC_MASK & (c - TC_UNIT)) |
- (SP_MASK & c))));
- }
- if (w != null) {
- w.currentSteal = null;
- w.qlock = -1; // ensure set
- w.cancelAll(); // cancel remaining tasks
+ phase = w.phase;
}
- while (tryTerminate(false, false) >= 0) { // possibly replace
- WorkQueue[] ws; int wl, sp; long c;
- if (w == null || w.array == null ||
- (ws = workQueues) == null || (wl = ws.length) <= 0)
- break;
- else if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
- if (tryRelease(c, ws[(wl - 1) & sp], AC_UNIT))
- break;
- }
- else if (ex != null && (c & ADD_WORKER) != 0L) {
- tryAddWorker(c); // create replacement
- break;
- }
- else // don't need replacement
- break;
+ if (phase != QUIET) { // else pre-adjusted
+ long c; // decrement counts
+ do {} while (!CTL.weakCompareAndSetVolatile
+ (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
+ (TC_MASK & (c - TC_UNIT)) |
+ (SP_MASK & c))));
}
+ if (w != null)
+ w.cancelAll(); // cancel remaining tasks
+
+ if (!tryTerminate(false, false) && // possibly replace worker
+ w != null && w.array != null) // avoid repeated failures
+ signalWork();
+
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
- // Signalling
-
/**
- * Tries to create or activate a worker if too few are active.
+ * Tries to create or release a worker if too few are running.
*/
final void signalWork() {
for (;;) {
- long c; int sp, i; WorkQueue v; WorkQueue[] ws;
+ long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
if ((c = ctl) >= 0L) // enough workers
break;
else if ((sp = (int)c) == 0) { // no idle workers
@@ -1705,12 +1451,14 @@
else if ((v = ws[i]) == null)
break; // terminating
else {
- int ns = sp & ~UNSIGNALLED;
- int vs = v.scanState;
- long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
- if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
- v.scanState = ns;
- LockSupport.unpark(v.parker);
+ int np = sp & ~UNSIGNALLED;
+ int vp = v.phase;
+ long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
+ Thread vt = v.owner;
+ if (sp == vp && CTL.compareAndSet(this, c, nc)) {
+ v.phase = np;
+ if (v.source < 0)
+ LockSupport.unpark(vt);
break;
}
}
@@ -1718,442 +1466,183 @@
}
/**
- * Signals and releases worker v if it is top of idle worker
- * stack. This performs a one-shot version of signalWork only if
- * there is (apparently) at least one idle worker.
+ * Tries to decrement counts (sometimes implicitly) and possibly
+ * arrange for a compensating worker in preparation for blocking:
+ * If not all core workers yet exist, creates one, else if any are
+ * unreleased (possibly including caller) releases one, else if
+ * fewer than the minimum allowed number of workers running,
+ * checks to see that they are all active, and if so creates an
+ * extra worker unless over maximum limit and policy is to
+ * saturate. Most of these steps can fail due to interference, in
+ * which case 0 is returned so caller will retry. A negative
+ * return value indicates that the caller doesn't need to
+ * re-adjust counts when later unblocked.
*
- * @param c incoming ctl value
- * @param v if non-null, a worker
- * @param inc the increment to active count (zero when compensating)
- * @return true if successful
- */
- private boolean tryRelease(long c, WorkQueue v, long inc) {
- int sp = (int)c, ns = sp & ~UNSIGNALLED;
- if (v != null) {
- int vs = v.scanState;
- long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + inc));
- if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
- v.scanState = ns;
- LockSupport.unpark(v.parker);
- return true;
- }
- }
- return false;
- }
-
- /**
- * With approx probability of a missed signal, tries (once) to
- * reactivate worker w (or some other worker), failing if stale or
- * known to be already active.
- *
- * @param w the worker
- * @param ws the workQueue array to use
- * @param r random seed
- */
- private void tryReactivate(WorkQueue w, WorkQueue[] ws, int r) {
- long c; int sp, wl; WorkQueue v;
- if ((sp = (int)(c = ctl)) != 0 && w != null &&
- ws != null && (wl = ws.length) > 0 &&
- ((sp ^ r) & SS_SEQ) == 0 &&
- (v = ws[(wl - 1) & sp]) != null) {
- long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
- int ns = sp & ~UNSIGNALLED;
- if (w.scanState < 0 &&
- v.scanState == sp &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- v.scanState = ns;
- LockSupport.unpark(v.parker);
- }
- }
- }
-
- /**
- * If worker w exists and is active, enqueues and sets status to inactive.
- *
- * @param w the worker
- * @param ss current (non-negative) scanState
+ * @return 1: block then adjust, -1: block without adjust, 0 : retry
*/
- private void inactivate(WorkQueue w, int ss) {
- int ns = (ss + SS_SEQ) | UNSIGNALLED;
- long lc = ns & SP_MASK, nc, c;
- if (w != null) {
- w.scanState = ns;
- do {
- nc = lc | (UC_MASK & ((c = ctl) - AC_UNIT));
- w.stackPred = (int)c;
- } while (!U.compareAndSwapLong(this, CTL, c, nc));
- }
- }
-
- /**
- * Possibly blocks worker w waiting for signal, or returns
- * negative status if the worker should terminate. May return
- * without status change if multiple stale unparks and/or
- * interrupts occur.
- *
- * @param w the calling worker
- * @return negative if w should terminate
- */
- private int awaitWork(WorkQueue w) {
- int stat = 0;
- if (w != null && w.scanState < 0) {
- long c = ctl;
- if ((int)(c >> AC_SHIFT) + (config & SMASK) <= 0)
- stat = timedAwaitWork(w, c); // possibly quiescent
- else if ((runState & STOP) != 0)
- stat = w.qlock = -1; // pool terminating
- else if (w.scanState < 0) {
- w.parker = Thread.currentThread();
- if (w.scanState < 0) // recheck after write
- LockSupport.park(this);
- w.parker = null;
- if ((runState & STOP) != 0)
- stat = w.qlock = -1; // recheck
- else if (w.scanState < 0)
- Thread.interrupted(); // clear status
+ private int tryCompensate(WorkQueue w) {
+ int t, n, sp;
+ long c = ctl;
+ WorkQueue[] ws = workQueues;
+ if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
+ if (ws == null || (n = ws.length) <= 0 || w == null)
+ return 0; // disabled
+ else if ((sp = (int)c) != 0) { // replace or release
+ WorkQueue v = ws[sp & (n - 1)];
+ int wp = w.phase;
+ long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
+ int np = sp & ~UNSIGNALLED;
+ if (v != null) {
+ int vp = v.phase;
+ Thread vt = v.owner;
+ long nc = ((long)v.stackPred & SP_MASK) | uc;
+ if (vp == sp && CTL.compareAndSet(this, c, nc)) {
+ v.phase = np;
+ if (v.source < 0)
+ LockSupport.unpark(vt);
+ return (wp < 0) ? -1 : 1;
+ }
+ }
+ return 0;
+ }
+ else if ((int)(c >> RC_SHIFT) - // reduce parallelism
+ (short)(bounds & SMASK) > 0) {
+ long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
+ return CTL.compareAndSet(this, c, nc) ? 1 : 0;
}
- }
- return stat;
- }
-
- /**
- * Possibly triggers shutdown and tries (once) to block worker
- * when pool is (or may be) quiescent. Waits up to a duration
- * determined by number of workers. On timeout, if ctl has not
- * changed, terminates the worker, which will in turn wake up
- * another worker to possibly repeat this process.
- *
- * @param w the calling worker
- * @return negative if w should terminate
- */
- private int timedAwaitWork(WorkQueue w, long c) {
- int stat = 0;
- int scale = 1 - (short)(c >>> TC_SHIFT);
- long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
- System.currentTimeMillis());
- if ((runState >= 0 || (stat = tryTerminate(false, false)) > 0) &&
- w != null && w.scanState < 0) {
- int ss; AuxState aux;
- w.parker = Thread.currentThread();
- if (w.scanState < 0)
- LockSupport.parkUntil(this, deadline);
- w.parker = null;
- if ((runState & STOP) != 0)
- stat = w.qlock = -1; // pool terminating
- else if ((ss = w.scanState) < 0 && !Thread.interrupted() &&
- (int)c == ss && (aux = auxState) != null && ctl == c &&
- deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
- aux.lock();
- try { // pre-deregister
- WorkQueue[] ws;
- int cfg = w.config, idx = cfg & SMASK;
- long nc = ((UC_MASK & (c - TC_UNIT)) |
- (SP_MASK & w.stackPred));
- if ((runState & STOP) == 0 &&
- (ws = workQueues) != null &&
- idx < ws.length && idx >= 0 && ws[idx] == w &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- ws[idx] = null;
- w.config = cfg | UNREGISTERED;
- stat = w.qlock = -1;
+ else { // validate
+ int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
+ boolean unstable = false;
+ for (int i = 1; i < n; i += 2) {
+ WorkQueue q; Thread wt; Thread.State ts;
+ if ((q = ws[i]) != null) {
+ if (q.source == 0) {
+ unstable = true;
+ break;
+ }
+ else {
+ --tc;
+ if ((wt = q.owner) != null &&
+ ((ts = wt.getState()) == Thread.State.BLOCKED ||
+ ts == Thread.State.WAITING))
+ ++bc; // worker is blocking
+ }
}
- } finally {
- aux.unlock();
+ }
+ if (unstable || tc != 0 || ctl != c)
+ return 0; // inconsistent
+ else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
+ Predicate<? super ForkJoinPool> sat;
+ if ((sat = saturate) != null && sat.test(this))
+ return -1;
+ else if (bc < pc) { // lagging
+ Thread.yield(); // for retry spins
+ return 0;
+ }
+ else
+ throw new RejectedExecutionException(
+ "Thread limit exceeded replacing blocked worker");
}
}
}
- return stat;
- }
- /**
- * If the given worker is a spare with no queued tasks, and there
- * are enough existing workers, drops it from ctl counts and sets
- * its state to terminated.
- *
- * @param w the calling worker -- must be a spare
- * @return true if dropped (in which case it must not process more tasks)
- */
- private boolean tryDropSpare(WorkQueue w) {
- if (w != null && w.isEmpty()) { // no local tasks
- long c; int sp, wl; WorkQueue[] ws; WorkQueue v;
- while ((short)((c = ctl) >> TC_SHIFT) > 0 &&
- ((sp = (int)c) != 0 || (int)(c >> AC_SHIFT) > 0) &&
- (ws = workQueues) != null && (wl = ws.length) > 0) {
- boolean dropped, canDrop;
- if (sp == 0) { // no queued workers
- long nc = ((AC_MASK & (c - AC_UNIT)) |
- (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c));
- dropped = U.compareAndSwapLong(this, CTL, c, nc);
- }
- else if (
- (v = ws[(wl - 1) & sp]) == null || v.scanState != sp)
- dropped = false; // stale; retry
- else {
- long nc = v.stackPred & SP_MASK;
- if (w == v || w.scanState >= 0) {
- canDrop = true; // w unqueued or topmost
- nc |= ((AC_MASK & c) | // ensure replacement
- (TC_MASK & (c - TC_UNIT)));
- }
- else { // w may be queued
- canDrop = false; // help uncover
- nc |= ((AC_MASK & (c + AC_UNIT)) |
- (TC_MASK & c));
- }
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- v.scanState = sp & ~UNSIGNALLED;
- LockSupport.unpark(v.parker);
- dropped = canDrop;
- }
- else
- dropped = false;
- }
- if (dropped) { // pre-deregister
- int cfg = w.config, idx = cfg & SMASK;
- if (idx >= 0 && idx < ws.length && ws[idx] == w)
- ws[idx] = null;
- w.config = cfg | UNREGISTERED;
- w.qlock = -1;
- return true;
- }
- }
- }
- return false;
+ long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
+ return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
}
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
+ * See above for explanation.
*/
final void runWorker(WorkQueue w) {
+ WorkQueue[] ws;
w.growArray(); // allocate queue
- int bound = (w.config & SPARE_WORKER) != 0 ? 0 : POLL_LIMIT;
- long seed = w.hint * 0xdaba0b6eb09322e3L; // initial random seed
- if ((runState & STOP) == 0) {
- for (long r = (seed == 0L) ? 1L : seed;;) { // ensure nonzero
- if (bound == 0 && tryDropSpare(w))
- break;
- // high bits of prev seed for step; current low bits for idx
- int step = (int)(r >>> 48) | 1;
- r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift
- if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0)
- break;
- }
- }
- }
-
- // Scanning for tasks
-
- /**
- * Repeatedly scans for and tries to steal and execute (via
- * workQueue.runTask) a queued task. Each scan traverses queues in
- * pseudorandom permutation. Upon finding a non-empty queue, makes
- * at most the given bound attempts to re-poll (fewer if
- * contended) on the same queue before returning (impossible
- * scanState value) 0 to restart scan. Else returns after at least
- * 1 and at most 32 full scans.
- *
- * @param w the worker (via its WorkQueue)
- * @param bound repoll bound as bitmask (0 if spare)
- * @param step (circular) index increment per iteration (must be odd)
- * @param r a random seed for origin index
- * @return negative if should await signal
- */
- private int scan(WorkQueue w, int bound, int step, int r) {
- int stat = 0, wl; WorkQueue[] ws;
- if ((ws = workQueues) != null && w != null && (wl = ws.length) > 0) {
- for (int m = wl - 1,
- origin = m & r, idx = origin,
- npolls = 0,
- ss = w.scanState;;) { // negative if inactive
- WorkQueue q; ForkJoinTask<?>[] a; int b, al;
- if ((q = ws[idx]) != null && (b = q.base) - q.top < 0 &&
+ int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
+ if (r == 0) // initial nonzero seed
+ r = 1;
+ int lastSignalId = 0; // avoid unneeded signals
+ while ((ws = workQueues) != null) {
+ boolean nonempty = false; // scan
+ for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
+ WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
+ if ((i = r & m) >= 0 && i < n && // always true
+ (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
(a = q.array) != null && (al = a.length) > 0) {
+ int qid = q.id; // (never zero)
int index = (al - 1) & b;
- long offset = ((long)index << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (t == null)
- break; // empty or busy
- else if (b++ != q.base)
- break; // busy
- else if (ss < 0) {
- tryReactivate(w, ws, r);
- break; // retry upon rescan
- }
- else if (!U.compareAndSwapObject(a, offset, t, null))
- break; // contended
- else {
- q.base = b;
- w.currentSteal = t;
- if (b != q.top) // propagate signal
- signalWork();
- w.runTask(t);
- if (++npolls > bound)
- break;
+ QA.getAcquire(a, index);
+ if (t != null && b++ == q.base &&
+ QA.compareAndSet(a, index, t, null)) {
+ if ((q.base = b) - q.top < 0 && qid != lastSignalId)
+ signalWork(); // propagate signal
+ w.source = lastSignalId = qid;
+ t.doExec();
+ if ((w.id & FIFO) != 0) // run remaining locals
+ w.localPollAndExec(POLL_LIMIT);
+ else
+ w.localPopAndExec(POLL_LIMIT);
+ ForkJoinWorkerThread thread = w.owner;
+ ++w.nsteals;
+ w.source = 0; // now idle
+ if (thread != null)
+ thread.afterTopLevelExec();
}
+ nonempty = true;
}
- else if (npolls != 0) // rescan
+ else if (nonempty)
break;
- else if ((idx = (idx + step) & m) == origin) {
- if (ss < 0) { // await signal
- stat = ss;
- break;
- }
- else if (r >= 0) {
- inactivate(w, ss);
- break;
- }
- else
- r <<= 1; // at most 31 rescans
- }
+ else
+ ++r;
}
- }
- return stat;
- }
-
- // Joining tasks
- /**
- * Tries to steal and run tasks within the target's computation.
- * Uses a variant of the top-level algorithm, restricted to tasks
- * with the given task as ancestor: It prefers taking and running
- * eligible tasks popped from the worker's own queue (via
- * popCC). Otherwise it scans others, randomly moving on
- * contention or execution, deciding to give up based on a
- * checksum (via return codes from pollAndExecCC). The maxTasks
- * argument supports external usages; internal calls use zero,
- * allowing unbounded steps (external calls trap non-positive
- * values).
- *
- * @param w caller
- * @param maxTasks if non-zero, the maximum number of other tasks to run
- * @return task status on exit
- */
- final int helpComplete(WorkQueue w, CountedCompleter<?> task,
- int maxTasks) {
- WorkQueue[] ws; int s = 0, wl;
- if ((ws = workQueues) != null && (wl = ws.length) > 1 &&
- task != null && w != null) {
- for (int m = wl - 1,
- mode = w.config,
- r = ~mode, // scanning seed
- origin = r & m, k = origin, // first queue to scan
- step = 3, // first scan step
- h = 1, // 1:ran, >1:contended, <0:hash
- oldSum = 0, checkSum = 0;;) {
- CountedCompleter<?> p; WorkQueue q; int i;
- if ((s = task.status) < 0)
- break;
- if (h == 1 && (p = w.popCC(task, mode)) != null) {
- p.doExec(); // run local task
- if (maxTasks != 0 && --maxTasks == 0)
- break;
- origin = k; // reset
- oldSum = checkSum = 0;
- }
- else { // poll other worker queues
- if ((i = k | 1) < 0 || i > m || (q = ws[i]) == null)
- h = 0;
- else if ((h = q.pollAndExecCC(task)) < 0)
- checkSum += h;
- if (h > 0) {
- if (h == 1 && maxTasks != 0 && --maxTasks == 0)
- break;
- step = (r >>> 16) | 3;
- r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
- k = origin = r & m; // move and restart
- oldSum = checkSum = 0;
- }
- else if ((k = (k + step) & m) == origin) {
- if (oldSum == (oldSum = checkSum))
- break;
- checkSum = 0;
- }
- }
+ if (nonempty) { // move (xorshift)
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
}
- }
- return s;
- }
-
- /**
- * Tries to locate and execute tasks for a stealer of the given
- * task, or in turn one of its stealers. Traces currentSteal ->
- * currentJoin links looking for a thread working on a descendant
- * of the given task and with a non-empty queue to steal back and
- * execute tasks from. The first call to this method upon a
- * waiting join will often entail scanning/search, (which is OK
- * because the joiner has nothing better to do), but this method
- * leaves hints in workers to speed up subsequent calls.
- *
- * @param w caller
- * @param task the task to join
- */
- private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
- if (task != null && w != null) {
- ForkJoinTask<?> ps = w.currentSteal;
- WorkQueue[] ws; int wl, oldSum = 0;
- outer: while (w.tryRemoveAndExec(task) && task.status >= 0 &&
- (ws = workQueues) != null && (wl = ws.length) > 0) {
- ForkJoinTask<?> subtask;
- int m = wl - 1, checkSum = 0; // for stability check
- WorkQueue j = w, v; // v is subtask stealer
- descent: for (subtask = task; subtask.status >= 0; ) {
- for (int h = j.hint | 1, k = 0, i;;) {
- if ((v = ws[i = (h + (k << 1)) & m]) != null) {
- if (v.currentSteal == subtask) {
- j.hint = i;
- break;
- }
- checkSum += v.base;
+ else {
+ int phase;
+ lastSignalId = 0; // clear for next scan
+ if ((phase = w.phase) >= 0) { // enqueue
+ int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
+ long c, nc;
+ do {
+ w.stackPred = (int)(c = ctl);
+ nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
+ } while (!CTL.weakCompareAndSetVolatile(this, c, nc));
+ }
+ else { // already queued
+ int pred = w.stackPred;
+ w.source = DORMANT; // enable signal
+ for (int steps = 0;;) {
+ int md, rc; long c;
+ if (w.phase >= 0) {
+ w.source = 0;
+ break;
}
- if (++k > m) // can't find stealer
- break outer;
- }
-
- for (;;) { // help v or descend
- ForkJoinTask<?>[] a; int b, al;
- if (subtask.status < 0) // too late to help
- break descent;
- checkSum += (b = v.base);
- ForkJoinTask<?> next = v.currentJoin;
- ForkJoinTask<?> t = null;
- if ((a = v.array) != null && (al = a.length) > 0) {
- int index = (al - 1) & b;
- long offset = ((long)index << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, offset);
- if (t != null && b++ == v.base) {
- if (j.currentJoin != subtask ||
- v.currentSteal != subtask ||
- subtask.status < 0)
- break descent; // stale
- if (U.compareAndSwapObject(a, offset, t, null)) {
- v.base = b;
- w.currentSteal = t;
- for (int top = w.top;;) {
- t.doExec(); // help
- w.currentSteal = ps;
- if (task.status < 0)
- break outer;
- if (w.top == top)
- break; // run local tasks
- if ((t = w.pop()) == null)
- break descent;
- w.currentSteal = t;
- }
+ else if ((md = mode) < 0) // shutting down
+ return;
+ else if ((rc = ((md & SMASK) + // possibly quiescent
+ (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
+ (md & SHUTDOWN) != 0 &&
+ tryTerminate(false, false))
+ return; // help terminate
+ else if ((++steps & 1) == 0)
+ Thread.interrupted(); // clear between parks
+ else if (rc <= 0 && pred != 0 && phase == (int)c) {
+ long d = keepAlive + System.currentTimeMillis();
+ LockSupport.parkUntil(this, d);
+ if (ctl == c &&
+ d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
+ long nc = ((UC_MASK & (c - TC_UNIT)) |
+ (SP_MASK & pred));
+ if (CTL.compareAndSet(this, c, nc)) {
+ w.phase = QUIET;
+ return; // drop on timeout
}
}
}
- if (t == null && b == v.base && b - v.top >= 0) {
- if ((subtask = next) == null) { // try to descend
- if (next == v.currentJoin &&
- oldSum == (oldSum = checkSum))
- break outer;
- break descent;
- }
- j = v;
- break;
- }
+ else
+ LockSupport.park(this);
}
}
}
@@ -2161,59 +1650,10 @@
}
/**
- * Tries to decrement active count (sometimes implicitly) and
- * possibly release or create a compensating worker in preparation
- * for blocking. Returns false (retryable by caller), on
- * contention, detected staleness, instability, or termination.
- *
- * @param w caller
- */
- private boolean tryCompensate(WorkQueue w) {
- boolean canBlock; int wl;
- long c = ctl;
- WorkQueue[] ws = workQueues;
- int pc = config & SMASK;
- int ac = pc + (int)(c >> AC_SHIFT);
- int tc = pc + (short)(c >> TC_SHIFT);
- if (w == null || w.qlock < 0 || pc == 0 || // terminating or disabled
- ws == null || (wl = ws.length) <= 0)
- canBlock = false;
- else {
- int m = wl - 1, sp;
- boolean busy = true; // validate ac
- for (int i = 0; i <= m; ++i) {
- int k; WorkQueue v;
- if ((k = (i << 1) | 1) <= m && k >= 0 && (v = ws[k]) != null &&
- v.scanState >= 0 && v.currentSteal == null) {
- busy = false;
- break;
- }
- }
- if (!busy || ctl != c)
- canBlock = false; // unstable or stale
- else if ((sp = (int)c) != 0) // release idle worker
- canBlock = tryRelease(c, ws[m & sp], 0L);
- else if (tc >= pc && ac > 1 && w.isEmpty()) {
- long nc = ((AC_MASK & (c - AC_UNIT)) |
- (~AC_MASK & c)); // uncompensated
- canBlock = U.compareAndSwapLong(this, CTL, c, nc);
- }
- else if (tc >= MAX_CAP ||
- (this == common && tc >= pc + COMMON_MAX_SPARES))
- throw new RejectedExecutionException(
- "Thread limit exceeded replacing blocked worker");
- else { // similar to tryAddWorker
- boolean isSpare = (tc >= pc);
- long nc = (AC_MASK & c) | (TC_MASK & (c + TC_UNIT));
- canBlock = (U.compareAndSwapLong(this, CTL, c, nc) &&
- createWorker(isSpare)); // throws on exception
- }
- }
- return canBlock;
- }
-
- /**
* Helps and/or blocks until the given task is done or timeout.
+ * First tries locally helping, then scans other queues for a task
+ * produced by one of w's stealers; compensating and blocking if
+ * none are found (rescanning if tryCompensate fails).
*
* @param w caller
* @param task the task
@@ -2222,61 +1662,166 @@
*/
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
- if (w != null) {
- ForkJoinTask<?> prevJoin = w.currentJoin;
- if (task != null && (s = task.status) >= 0) {
- w.currentJoin = task;
- CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
- (CountedCompleter<?>)task : null;
- for (;;) {
- if (cc != null)
- helpComplete(w, cc, 0);
- else
- helpStealer(w, task);
- if ((s = task.status) < 0)
- break;
- long ms, ns;
+ if (w != null && task != null &&
+ (!(task instanceof CountedCompleter) ||
+ (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
+ w.tryRemoveAndExec(task);
+ int src = w.source, id = w.id;
+ s = task.status;
+ while (s >= 0) {
+ WorkQueue[] ws;
+ boolean nonempty = false;
+ int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
+ if ((ws = workQueues) != null) { // scan for matching id
+ for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
+ WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
+ if ((i = (r + j) & m) >= 0 && i < n &&
+ (q = ws[i]) != null && q.source == id &&
+ (b = q.base) - q.top < 0 &&
+ (a = q.array) != null && (al = a.length) > 0) {
+ int qid = q.id;
+ int index = (al - 1) & b;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ QA.getAcquire(a, index);
+ if (t != null && b++ == q.base && id == q.source &&
+ QA.compareAndSet(a, index, t, null)) {
+ q.base = b;
+ w.source = qid;
+ t.doExec();
+ w.source = src;
+ }
+ nonempty = true;
+ break;
+ }
+ }
+ }
+ if ((s = task.status) < 0)
+ break;
+ else if (!nonempty) {
+ long ms, ns; int block;
if (deadline == 0L)
- ms = 0L;
+ ms = 0L; // untimed
else if ((ns = deadline - System.nanoTime()) <= 0L)
- break;
+ break; // timeout
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
- ms = 1L;
- if (tryCompensate(w)) {
+ ms = 1L; // avoid 0 for timed wait
+ if ((block = tryCompensate(w)) != 0) {
task.internalWait(ms);
- U.getAndAddLong(this, CTL, AC_UNIT);
+ CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
}
- if ((s = task.status) < 0)
- break;
+ s = task.status;
}
- w.currentJoin = prevJoin;
}
}
return s;
}
- // Specialized scanning
+ /**
+ * Runs tasks until {@code isQuiescent()}. Rather than blocking
+ * when tasks cannot be found, rescans until all others cannot
+ * find tasks either.
+ */
+ final void helpQuiescePool(WorkQueue w) {
+ int prevSrc = w.source, fifo = w.id & FIFO;
+ for (int source = prevSrc, released = -1;;) { // -1 until known
+ WorkQueue[] ws;
+ if (fifo != 0)
+ w.localPollAndExec(0);
+ else
+ w.localPopAndExec(0);
+ if (released == -1 && w.phase >= 0)
+ released = 1;
+ boolean quiet = true, empty = true;
+ int r = ThreadLocalRandom.nextSecondarySeed();
+ if ((ws = workQueues) != null) {
+ for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
+ WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
+ if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
+ if ((b = q.base) - q.top < 0 &&
+ (a = q.array) != null && (al = a.length) > 0) {
+ int qid = q.id;
+ if (released == 0) { // increment
+ released = 1;
+ CTL.getAndAdd(this, RC_UNIT);
+ }
+ int index = (al - 1) & b;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ QA.getAcquire(a, index);
+ if (t != null && b++ == q.base &&
+ QA.compareAndSet(a, index, t, null)) {
+ q.base = b;
+ w.source = source = q.id;
+ t.doExec();
+ w.source = source = prevSrc;
+ }
+ quiet = empty = false;
+ break;
+ }
+ else if ((q.source & QUIET) == 0)
+ quiet = false;
+ }
+ }
+ }
+ if (quiet) {
+ if (released == 0)
+ CTL.getAndAdd(this, RC_UNIT);
+ w.source = prevSrc;
+ break;
+ }
+ else if (empty) {
+ if (source != QUIET)
+ w.source = source = QUIET;
+ if (released == 1) { // decrement
+ released = 0;
+ CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
+ }
+ }
+ }
+ }
/**
- * Returns a (probably) non-empty steal queue, if one is found
- * during a scan, else null. This method must be retried by
- * caller if, by the time it tries to use the queue, it is empty.
+ * Scans for and returns a polled task, if available.
+ * Used only for untracked polls.
+ *
+ * @param submissionsOnly if true, only scan submission queues
*/
- private WorkQueue findNonEmptyStealQueue() {
- WorkQueue[] ws; int wl; // one-shot version of scan loop
- int r = ThreadLocalRandom.nextSecondarySeed();
- if ((ws = workQueues) != null && (wl = ws.length) > 0) {
- int m = wl - 1, origin = r & m;
+ private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
+ WorkQueue[] ws; int n;
+ rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
+ (n = ws.length) > 0) {
+ int m = n - 1;
+ int r = ThreadLocalRandom.nextSecondarySeed();
+ int h = r >>> 16;
+ int origin, step;
+ if (submissionsOnly) {
+ origin = (r & ~1) & m; // even indices and steps
+ step = (h & ~1) | 2;
+ }
+ else {
+ origin = r & m;
+ step = h | 1;
+ }
for (int k = origin, oldSum = 0, checkSum = 0;;) {
- WorkQueue q; int b;
+ WorkQueue q; int b, al; ForkJoinTask<?>[] a;
if ((q = ws[k]) != null) {
- if ((b = q.base) - q.top < 0)
- return q;
- checkSum += b;
+ checkSum += b = q.base;
+ if (b - q.top < 0 &&
+ (a = q.array) != null && (al = a.length) > 0) {
+ int index = (al - 1) & b;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ QA.getAcquire(a, index);
+ if (t != null && b++ == q.base &&
+ QA.compareAndSet(a, index, t, null)) {
+ q.base = b;
+ return t;
+ }
+ else
+ break; // restart
+ }
}
- if ((k = (k + 1) & m) == origin) {
+ if ((k = (k + step) & m) == origin) {
if (oldSum == (oldSum = checkSum))
- break;
+ break rescan;
checkSum = 0;
}
}
@@ -2285,58 +1830,160 @@
}
/**
- * Runs tasks until {@code isQuiescent()}. We piggyback on
- * active count ctl maintenance, but rather than blocking
- * when tasks cannot be found, we rescan until all others cannot
- * find tasks either.
- */
- final void helpQuiescePool(WorkQueue w) {
- ForkJoinTask<?> ps = w.currentSteal; // save context
- int wc = w.config;
- for (boolean active = true;;) {
- long c; WorkQueue q; ForkJoinTask<?> t;
- if (wc >= 0 && (t = w.pop()) != null) { // run locals if LIFO
- (w.currentSteal = t).doExec();
- w.currentSteal = ps;
- }
- else if ((q = findNonEmptyStealQueue()) != null) {
- if (!active) { // re-establish active count
- active = true;
- U.getAndAddLong(this, CTL, AC_UNIT);
- }
- if ((t = q.pollAt(q.base)) != null) {
- (w.currentSteal = t).doExec();
- w.currentSteal = ps;
- if (++w.nsteals < 0)
- w.transferStealCount(this);
- }
- }
- else if (active) { // decrement active count without queuing
- long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
- if (U.compareAndSwapLong(this, CTL, c, nc))
- active = false;
- }
- else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
- U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
- break;
- }
- }
-
- /**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
*/
final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
- for (ForkJoinTask<?> t;;) {
- WorkQueue q;
- if ((t = w.nextLocalTask()) != null)
- return t;
- if ((q = findNonEmptyStealQueue()) == null)
- return null;
- if ((t = q.pollAt(q.base)) != null)
- return t;
+ ForkJoinTask<?> t;
+ if (w != null &&
+ (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
+ return t;
+ else
+ return pollScan(false);
+ }
+
+ // External operations
+
+ /**
+ * Adds the given task to a submission queue at submitter's
+ * current queue, creating one if null or contended.
+ *
+ * @param task the task. Caller must ensure non-null.
+ */
+ final void externalPush(ForkJoinTask<?> task) {
+ int r; // initialize caller's probe
+ if ((r = ThreadLocalRandom.getProbe()) == 0) {
+ ThreadLocalRandom.localInit();
+ r = ThreadLocalRandom.getProbe();
}
+ for (;;) {
+ int md = mode, n;
+ WorkQueue[] ws = workQueues;
+ if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
+ throw new RejectedExecutionException();
+ else {
+ WorkQueue q;
+ boolean push = false, grow = false;
+ if ((q = ws[(n - 1) & r & SQMASK]) == null) {
+ Object lock = workerNamePrefix;
+ int qid = (r | QUIET) & ~(FIFO | OWNED);
+ q = new WorkQueue(this, null);
+ q.id = qid;
+ q.source = QUIET;
+ q.phase = QLOCK; // lock queue
+ if (lock != null) {
+ synchronized (lock) { // lock pool to install
+ int i;
+ if ((ws = workQueues) != null &&
+ (n = ws.length) > 0 &&
+ ws[i = qid & (n - 1) & SQMASK] == null) {
+ ws[i] = q;
+ push = grow = true;
+ }
+ }
+ }
+ }
+ else if (q.tryLockSharedQueue()) {
+ int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
+ if ((a = q.array) != null && (al = a.length) > 0 &&
+ al - 1 + (d = b - s) > 0) {
+ a[(al - 1) & s] = task;
+ q.top = s + 1; // relaxed writes OK here
+ q.phase = 0;
+ if (d < 0 && q.base - s < -1)
+ break; // no signal needed
+ }
+ else
+ grow = true;
+ push = true;
+ }
+ if (push) {
+ if (grow) {
+ try {
+ q.growArray();
+ int s = q.top, al; ForkJoinTask<?>[] a;
+ if ((a = q.array) != null && (al = a.length) > 0) {
+ a[(al - 1) & s] = task;
+ q.top = s + 1;
+ }
+ } finally {
+ q.phase = 0;
+ }
+ }
+ signalWork();
+ break;
+ }
+ else // move if busy
+ r = ThreadLocalRandom.advanceProbe(r);
+ }
+ }
+ }
+
+ /**
+ * Pushes a possibly-external submission.
+ */
+ private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
+ Thread t; ForkJoinWorkerThread w; WorkQueue q;
+ if (task == null)
+ throw new NullPointerException();
+ if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
+ (w = (ForkJoinWorkerThread)t).pool == this &&
+ (q = w.workQueue) != null)
+ q.push(task);
+ else
+ externalPush(task);
+ return task;
+ }
+
+ /**
+ * Returns common pool queue for an external thread.
+ */
+ static WorkQueue commonSubmitterQueue() {
+ ForkJoinPool p = common;
+ int r = ThreadLocalRandom.getProbe();
+ WorkQueue[] ws; int n;
+ return (p != null && (ws = p.workQueues) != null &&
+ (n = ws.length) > 0) ?
+ ws[(n - 1) & r & SQMASK] : null;
+ }
+
+ /**
+ * Performs tryUnpush for an external submitter.
+ */
+ final boolean tryExternalUnpush(ForkJoinTask<?> task) {
+ int r = ThreadLocalRandom.getProbe();
+ WorkQueue[] ws; WorkQueue w; int n;
+ return ((ws = workQueues) != null &&
+ (n = ws.length) > 0 &&
+ (w = ws[(n - 1) & r & SQMASK]) != null &&
+ w.trySharedUnpush(task));
+ }
+
+ /**
+ * Performs helpComplete for an external submitter.
+ */
+ final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
+ int r = ThreadLocalRandom.getProbe();
+ WorkQueue[] ws; WorkQueue w; int n;
+ return ((ws = workQueues) != null && (n = ws.length) > 0 &&
+ (w = ws[(n - 1) & r & SQMASK]) != null) ?
+ w.sharedHelpCC(task, maxTasks) : 0;
+ }
+
+ /**
+ * Tries to steal and run tasks within the target's computation.
+ * The maxTasks argument supports external usages; internal calls
+ * use zero, allowing unbounded steps (external calls trap
+ * non-positive values).
+ *
+ * @param w caller
+ * @param maxTasks if non-zero, the maximum number of other tasks to run
+ * @return task status on exit
+ */
+ final int helpComplete(WorkQueue w, CountedCompleter<?> task,
+ int maxTasks) {
+ return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
}
/**
@@ -2383,10 +2030,12 @@
*/
static int getSurplusQueuedTaskCount() {
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
- int n = (q = wt.workQueue).top - q.base;
- int a = (int)(pool.ctl >> AC_SHIFT) + p;
+ if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
+ (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
+ (q = wt.workQueue) != null) {
+ int p = pool.mode & SMASK;
+ int a = p + (int)(pool.ctl >> RC_SHIFT);
+ int n = q.top - q.base;
return n - (a > (p >>>= 1) ? 0 :
a > (p >>>= 1) ? 1 :
a > (p >>>= 1) ? 2 :
@@ -2396,7 +2045,7 @@
return 0;
}
- // Termination
+ // Termination
/**
* Possibly initiates and/or completes termination.
@@ -2404,198 +2053,86 @@
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
* @param enable if true, terminate when next possible
- * @return -1: terminating/terminated, 0: retry if internal caller, else 1
+ * @return true if terminating or terminated
*/
- private int tryTerminate(boolean now, boolean enable) {
- int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
+ private boolean tryTerminate(boolean now, boolean enable) {
+ int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
- while ((rs = runState) >= 0) {
+ while (((md = mode) & SHUTDOWN) == 0) {
if (!enable || this == common) // cannot shutdown
- return 1;
- else if (rs == 0)
- tryInitialize(false); // ensure initialized
+ return false;
else
- U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN);
+ MODE.compareAndSet(this, md, md | SHUTDOWN);
}
- if ((rs & STOP) == 0) { // try to initiate termination
- if (!now) { // check quiescence
+ while (((md = mode) & STOP) == 0) { // try to initiate termination
+ if (!now) { // check if quiescent & empty
for (long oldSum = 0L;;) { // repeat until stable
- WorkQueue[] ws; WorkQueue w; int b;
+ boolean running = false;
long checkSum = ctl;
- if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
- return 0; // still active workers
- if ((ws = workQueues) != null) {
+ WorkQueue[] ws = workQueues;
+ if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
+ running = true;
+ else if (ws != null) {
+ WorkQueue w; int b;
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
- checkSum += (b = w.base);
- if (w.currentSteal != null || b != w.top)
- return 0; // retry if internal caller
+ checkSum += (b = w.base) + w.id;
+ if (b != w.top ||
+ ((i & 1) == 1 && w.source >= 0)) {
+ running = true;
+ break;
+ }
}
}
}
- if (oldSum == (oldSum = checkSum))
+ if (((md = mode) & STOP) != 0)
+ break; // already triggered
+ else if (running)
+ return false;
+ else if (workQueues == ws && oldSum == (oldSum = checkSum))
break;
}
}
- do {} while (!U.compareAndSwapInt(this, RUNSTATE,
- rs = runState, rs | STOP));
+ if ((md & STOP) == 0)
+ MODE.compareAndSet(this, md, md | STOP);
}
- for (long oldSum = 0L;;) { // repeat until stable
- WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt;
- long checkSum = ctl;
- if ((ws = workQueues) != null) { // help terminate others
- for (int i = 0; i < ws.length; ++i) {
- if ((w = ws[i]) != null) {
- w.cancelAll(); // clear queues
- checkSum += w.base;
- if (w.qlock >= 0) {
- w.qlock = -1; // racy set OK
- if ((wt = w.owner) != null) {
+ while (((md = mode) & TERMINATED) == 0) { // help terminate others
+ for (long oldSum = 0L;;) { // repeat until stable
+ WorkQueue[] ws; WorkQueue w;
+ long checkSum = ctl;
+ if ((ws = workQueues) != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ if ((w = ws[i]) != null) {
+ ForkJoinWorkerThread wt = w.owner;
+ w.cancelAll(); // clear queues
+ if (wt != null) {
try { // unblock join or park
wt.interrupt();
} catch (Throwable ignore) {
}
}
+ checkSum += w.base + w.id;
}
}
}
- }
- if (oldSum == (oldSum = checkSum))
- break;
- }
-
- if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
- runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write
- synchronized (this) {
- notifyAll(); // for awaitTermination
+ if (((md = mode) & TERMINATED) != 0 ||
+ (workQueues == ws && oldSum == (oldSum = checkSum)))
+ break;
}
- }
-
- return -1;
- }
-
- // External operations
-
- /**
- * Constructs and tries to install a new external queue,
- * failing if the workQueues array already has a queue at
- * the given index.
- *
- * @param index the index of the new queue
- */
- private void tryCreateExternalQueue(int index) {
- AuxState aux;
- if ((aux = auxState) != null && index >= 0) {
- WorkQueue q = new WorkQueue(this, null);
- q.config = index;
- q.scanState = ~UNSIGNALLED;
- q.qlock = 1; // lock queue
- boolean installed = false;
- aux.lock();
- try { // lock pool to install
- WorkQueue[] ws;
- if ((ws = workQueues) != null && index < ws.length &&
- ws[index] == null) {
- ws[index] = q; // else throw away
- installed = true;
+ if ((md & TERMINATED) != 0)
+ break;
+ else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
+ break;
+ else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
+ synchronized (this) {
+ notifyAll(); // for awaitTermination
}
- } finally {
- aux.unlock();
- }
- if (installed) {
- try {
- q.growArray();
- } finally {
- q.qlock = 0;
- }
+ break;
}
}
- }
-
- /**
- * Adds the given task to a submission queue at submitter's
- * current queue. Also performs secondary initialization upon the
- * first submission of the first task to the pool, and detects
- * first submission by an external thread and creates a new shared
- * queue if the one at index if empty or contended.
- *
- * @param task the task. Caller must ensure non-null.
- */
- final void externalPush(ForkJoinTask<?> task) {
- int r; // initialize caller's probe
- if ((r = ThreadLocalRandom.getProbe()) == 0) {
- ThreadLocalRandom.localInit();
- r = ThreadLocalRandom.getProbe();
- }
- for (;;) {
- WorkQueue q; int wl, k, stat;
- int rs = runState;
- WorkQueue[] ws = workQueues;
- if (rs <= 0 || ws == null || (wl = ws.length) <= 0)
- tryInitialize(true);
- else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
- tryCreateExternalQueue(k);
- else if ((stat = q.sharedPush(task)) < 0)
- break;
- else if (stat == 0) {
- signalWork();
- break;
- }
- else // move if busy
- r = ThreadLocalRandom.advanceProbe(r);
- }
- }
-
- /**
- * Pushes a possibly-external submission.
- */
- private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
- Thread t; ForkJoinWorkerThread w; WorkQueue q;
- if (task == null)
- throw new NullPointerException();
- if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
- (w = (ForkJoinWorkerThread)t).pool == this &&
- (q = w.workQueue) != null)
- q.push(task);
- else
- externalPush(task);
- return task;
- }
-
- /**
- * Returns common pool queue for an external thread.
- */
- static WorkQueue commonSubmitterQueue() {
- ForkJoinPool p = common;
- int r = ThreadLocalRandom.getProbe();
- WorkQueue[] ws; int wl;
- return (p != null && (ws = p.workQueues) != null &&
- (wl = ws.length) > 0) ?
- ws[(wl - 1) & r & SQMASK] : null;
- }
-
- /**
- * Performs tryUnpush for an external submitter.
- */
- final boolean tryExternalUnpush(ForkJoinTask<?> task) {
- int r = ThreadLocalRandom.getProbe();
- WorkQueue[] ws; WorkQueue w; int wl;
- return ((ws = workQueues) != null &&
- (wl = ws.length) > 0 &&
- (w = ws[(wl - 1) & r & SQMASK]) != null &&
- w.trySharedUnpush(task));
- }
-
- /**
- * Performs helpComplete for an external submitter.
- */
- final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
- WorkQueue[] ws; int wl;
- int r = ThreadLocalRandom.getProbe();
- return ((ws = workQueues) != null && (wl = ws.length) > 0) ?
- helpComplete(ws[(wl - 1) & r & SQMASK], task, maxTasks) : 0;
+ return true;
}
// Exported methods
@@ -2604,9 +2141,10 @@
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
- * java.lang.Runtime#availableProcessors}, using the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory},
- * no UncaughtExceptionHandler, and non-async LIFO processing mode.
+ * java.lang.Runtime#availableProcessors}, using defaults for all
+ * other parameters (see {@link #ForkJoinPool(int,
+ * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
+ * int, int, int, Predicate, long, TimeUnit)}).
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -2615,14 +2153,16 @@
*/
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
- defaultForkJoinWorkerThreadFactory, null, false);
+ defaultForkJoinWorkerThreadFactory, null, false,
+ 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
- * level, the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory},
- * no UncaughtExceptionHandler, and non-async LIFO processing mode.
+ * level, using defaults for all other parameters (see {@link
+ * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
+ * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
+ * long, TimeUnit)}).
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
@@ -2633,11 +2173,15 @@
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism) {
- this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
+ this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
+ 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
}
/**
- * Creates a {@code ForkJoinPool} with the given parameters.
+ * Creates a {@code ForkJoinPool} with the given parameters (using
+ * defaults for others -- see {@link #ForkJoinPool(int,
+ * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
+ * int, int, int, Predicate, long, TimeUnit)}).
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
@@ -2664,43 +2208,185 @@
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
- this(checkParallelism(parallelism),
- checkFactory(factory),
- handler,
- asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
- "ForkJoinPool-" + nextPoolId() + "-worker-");
+ this(parallelism, factory, handler, asyncMode,
+ 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a {@code ForkJoinPool} with the given parameters.
+ *
+ * @param parallelism the parallelism level. For default value,
+ * use {@link java.lang.Runtime#availableProcessors}.
+ *
+ * @param factory the factory for creating new threads. For
+ * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
+ *
+ * @param handler the handler for internal worker threads that
+ * terminate due to unrecoverable errors encountered while
+ * executing tasks. For default value, use {@code null}.
+ *
+ * @param asyncMode if true, establishes local first-in-first-out
+ * scheduling mode for forked tasks that are never joined. This
+ * mode may be more appropriate than default locally stack-based
+ * mode in applications in which worker threads only process
+ * event-style asynchronous tasks. For default value, use {@code
+ * false}.
+ *
+ * @param corePoolSize the number of threads to keep in the pool
+ * (unless timed out after an elapsed keep-alive). Normally (and
+ * by default) this is the same value as the parallelism level,
+ * but may be set to a larger value to reduce dynamic overhead if
+ * tasks regularly block. Using a smaller value (for example
+ * {@code 0}) has the same effect as the default.
+ *
+ * @param maximumPoolSize the maximum number of threads allowed.
+ * When the maximum is reached, attempts to replace blocked
+ * threads fail. (However, because creation and termination of
+ * different threads may overlap, and may be managed by the given
+ * thread factory, this value may be transiently exceeded.) To
+ * arrange the same value as is used by default for the common
+ * pool, use {@code 256} plus the {@code parallelism} level. (By
+ * default, the common pool allows a maximum of 256 spare
+ * threads.) Using a value (for example {@code
+ * Integer.MAX_VALUE}) larger than the implementation's total
+ * thread limit has the same effect as using this limit (which is
+ * the default).
+ *
+ * @param minimumRunnable the minimum allowed number of core
+ * threads not blocked by a join or {@link ManagedBlocker}. To
+ * ensure progress, when too few unblocked threads exist and
+ * unexecuted tasks may exist, new threads are constructed, up to
+ * the given maximumPoolSize. For the default value, use {@code
+ * 1}, that ensures liveness. A larger value might improve
+ * throughput in the presence of blocked activities, but might
+ * not, due to increased overhead. A value of zero may be
+ * acceptable when submitted tasks cannot have dependencies
+ * requiring additional threads.
+ *
+ * @param saturate if non-null, a predicate invoked upon attempts
+ * to create more than the maximum total allowed threads. By
+ * default, when a thread is about to block on a join or {@link
+ * ManagedBlocker}, but cannot be replaced because the
+ * maximumPoolSize would be exceeded, a {@link
+ * RejectedExecutionException} is thrown. But if this predicate
+ * returns {@code true}, then no exception is thrown, so the pool
+ * continues to operate with fewer than the target number of
+ * runnable threads, which might not ensure progress.
+ *
+ * @param keepAliveTime the elapsed time since last use before
+ * a thread is terminated (and then later replaced if needed).
+ * For the default value, use {@code 60, TimeUnit.SECONDS}.
+ *
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ *
+ * @throws IllegalArgumentException if parallelism is less than or
+ * equal to zero, or is greater than implementation limit,
+ * or if maximumPoolSize is less than parallelism,
+ * of if the keepAliveTime is less than or equal to zero.
+ * @throws NullPointerException if the factory is null
+ * @throws SecurityException if a security manager exists and
+ * the caller is not permitted to modify threads
+ * because it does not hold {@link
+ * java.lang.RuntimePermission}{@code ("modifyThread")}
+ * @since 9
+ */
+ public ForkJoinPool(int parallelism,
+ ForkJoinWorkerThreadFactory factory,
+ UncaughtExceptionHandler handler,
+ boolean asyncMode,
+ int corePoolSize,
+ int maximumPoolSize,
+ int minimumRunnable,
+ Predicate<? super ForkJoinPool> saturate,
+ long keepAliveTime,
+ TimeUnit unit) {
+ // check, encode, pack parameters
+ if (parallelism <= 0 || parallelism > MAX_CAP ||
+ maximumPoolSize < parallelism || keepAliveTime <= 0L)
+ throw new IllegalArgumentException();
+ if (factory == null)
+ throw new NullPointerException();
+ long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
+
+ String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
+ int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
+ long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
+ (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
+ int m = parallelism | (asyncMode ? FIFO : 0);
+ int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
+ int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
+ int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
+ int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
+ n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
+ n = (n + 1) << 1; // power of two, including space for submission queues
+
+ this.workQueues = new WorkQueue[n];
+ this.workerNamePrefix = prefix;
+ this.factory = factory;
+ this.ueh = handler;
+ this.saturate = saturate;
+ this.keepAlive = ms;
+ this.bounds = b;
+ this.mode = m;
+ this.ctl = c;
checkPermission();
}
- private static int checkParallelism(int parallelism) {
- if (parallelism <= 0 || parallelism > MAX_CAP)
- throw new IllegalArgumentException();
- return parallelism;
- }
-
- private static ForkJoinWorkerThreadFactory checkFactory
- (ForkJoinWorkerThreadFactory factory) {
- if (factory == null)
- throw new NullPointerException();
- return factory;
- }
+ /**
+ * Constructor for common pool using parameters possibly
+ * overridden by system properties
+ */
+ @SuppressWarnings("deprecation") // Class.newInstance
+ private ForkJoinPool(byte forCommonPoolOnly) {
+ int parallelism = -1;
+ ForkJoinWorkerThreadFactory fac = null;
+ UncaughtExceptionHandler handler = null;
+ try { // ignore exceptions in accessing/parsing properties
+ String pp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.parallelism");
+ String fp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.threadFactory");
+ String hp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
+ if (pp != null)
+ parallelism = Integer.parseInt(pp);
+ if (fp != null)
+ fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
+ getSystemClassLoader().loadClass(fp).newInstance());
+ if (hp != null)
+ handler = ((UncaughtExceptionHandler)ClassLoader.
+ getSystemClassLoader().loadClass(hp).newInstance());
+ } catch (Exception ignore) {
+ }
- /**
- * Creates a {@code ForkJoinPool} with the given parameters, without
- * any security checks or parameter validation. Invoked directly by
- * makeCommonPool.
- */
- private ForkJoinPool(int parallelism,
- ForkJoinWorkerThreadFactory factory,
- UncaughtExceptionHandler handler,
- int mode,
- String workerNamePrefix) {
- this.workerNamePrefix = workerNamePrefix;
- this.factory = factory;
+ if (fac == null) {
+ if (System.getSecurityManager() == null)
+ fac = defaultForkJoinWorkerThreadFactory;
+ else // use security-managed default
+ fac = new InnocuousForkJoinWorkerThreadFactory();
+ }
+ if (parallelism < 0 && // default 1 less than #cores
+ (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
+ parallelism = 1;
+ if (parallelism > MAX_CAP)
+ parallelism = MAX_CAP;
+
+ long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
+ (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
+ int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
+ int n = (parallelism > 1) ? parallelism - 1 : 1;
+ n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
+ n = (n + 1) << 1;
+
+ this.workQueues = new WorkQueue[n];
+ this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
+ this.factory = fac;
this.ueh = handler;
- this.config = (parallelism & SMASK) | mode;
- long np = (long)(-parallelism); // offset ctl counts
- this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
+ this.saturate = null;
+ this.keepAlive = DEFAULT_KEEPALIVE;
+ this.bounds = b;
+ this.mode = parallelism;
+ this.ctl = c;
}
/**
@@ -2876,8 +2562,8 @@
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
- int par;
- return ((par = config & SMASK) > 0) ? par : 1;
+ int par = mode & SMASK;
+ return (par > 0) ? par : 1;
}
/**
@@ -2899,7 +2585,7 @@
* @return the number of worker threads
*/
public int getPoolSize() {
- return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
+ return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
}
/**
@@ -2909,7 +2595,7 @@
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return (config & FIFO_QUEUE) != 0;
+ return (mode & FIFO) != 0;
}
/**
@@ -2940,7 +2626,7 @@
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
+ int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2956,7 +2642,30 @@
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
+ for (;;) {
+ long c = ctl;
+ int md = mode, pc = md & SMASK;
+ int tc = pc + (short)(c >>> TC_SHIFT);
+ int rc = pc + (int)(c >> RC_SHIFT);
+ if ((md & (STOP | TERMINATED)) != 0)
+ return true;
+ else if (rc > 0)
+ return false;
+ else {
+ WorkQueue[] ws; WorkQueue v;
+ if ((ws = workQueues) != null) {
+ for (int i = 1; i < ws.length; i += 2) {
+ if ((v = ws[i]) != null) {
+ if ((v.source & QUIET) == 0)
+ return false;
+ --tc;
+ }
+ }
+ }
+ if (tc == 0 && ctl == c)
+ return true;
+ }
+ }
}
/**
@@ -2971,13 +2680,12 @@
* @return the number of steals
*/
public long getStealCount() {
- AuxState sc = auxState;
- long count = (sc == null) ? 0L : sc.stealCount;
+ long count = stealCount;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
- count += w.nsteals;
+ count += (long)w.nsteals & 0xffffffffL;
}
}
return count;
@@ -3049,15 +2757,7 @@
* @return the next submission, or {@code null} if none
*/
protected ForkJoinTask<?> pollSubmission() {
- WorkQueue[] ws; int wl; WorkQueue w; ForkJoinTask<?> t;
- int r = ThreadLocalRandom.nextSecondarySeed();
- if ((ws = workQueues) != null && (wl = ws.length) > 0) {
- for (int m = wl - 1, i = 0; i < wl; ++i) {
- if ((w = ws[(i << 1) & m]) != null && (t = w.poll()) != null)
- return t;
- }
- }
- return null;
+ return pollScan(true);
}
/**
@@ -3103,9 +2803,7 @@
public String toString() {
// Use a single pass through workQueues to collect counts
long qt = 0L, qs = 0L; int rc = 0;
- AuxState sc = auxState;
- long st = (sc == null) ? 0L : sc.stealCount;
- long c = ctl;
+ long st = stealCount;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; ++i) {
@@ -3115,22 +2813,24 @@
qs += size;
else {
qt += size;
- st += w.nsteals;
+ st += (long)w.nsteals & 0xffffffffL;
if (w.isApparentlyUnblocked())
++rc;
}
}
}
}
- int pc = (config & SMASK);
+
+ int md = mode;
+ int pc = (md & SMASK);
+ long c = ctl;
int tc = pc + (short)(c >>> TC_SHIFT);
- int ac = pc + (int)(c >> AC_SHIFT);
+ int ac = pc + (int)(c >> RC_SHIFT);
if (ac < 0) // ignore transient negative
ac = 0;
- int rs = runState;
- String level = ((rs & TERMINATED) != 0 ? "Terminated" :
- (rs & STOP) != 0 ? "Terminating" :
- (rs & SHUTDOWN) != 0 ? "Shutting down" :
+ String level = ((md & TERMINATED) != 0 ? "Terminated" :
+ (md & STOP) != 0 ? "Terminating" :
+ (md & SHUTDOWN) != 0 ? "Shutting down" :
"Running");
return super.toString() +
"[" + level +
@@ -3193,7 +2893,7 @@
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- return (runState & TERMINATED) != 0;
+ return (mode & TERMINATED) != 0;
}
/**
@@ -3210,8 +2910,8 @@
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- int rs = runState;
- return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
+ int md = mode;
+ return (md & STOP) != 0 && (md & TERMINATED) == 0;
}
/**
@@ -3220,7 +2920,7 @@
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return (runState & SHUTDOWN) != 0;
+ return (mode & SHUTDOWN) != 0;
}
/**
@@ -3284,30 +2984,19 @@
helpQuiescePool(wt.workQueue);
return true;
}
- long startTime = System.nanoTime();
- WorkQueue[] ws;
- int r = 0, wl;
- boolean found = true;
- while (!isQuiescent() && (ws = workQueues) != null &&
- (wl = ws.length) > 0) {
- if (!found) {
- if ((System.nanoTime() - startTime) > nanos)
+ else {
+ for (long startTime = System.nanoTime();;) {
+ ForkJoinTask<?> t;
+ if ((t = pollScan(false)) != null)
+ t.doExec();
+ else if (isQuiescent())
+ return true;
+ else if ((System.nanoTime() - startTime) > nanos)
return false;
- Thread.yield(); // cannot block
- }
- found = false;
- for (int m = wl - 1, j = (m + 1) << 2; j >= 0; --j) {
- ForkJoinTask<?> t; WorkQueue q; int b, k;
- if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
- (b = q.base) - q.top < 0) {
- found = true;
- if ((t = q.pollAt(b)) != null)
- t.doExec();
- break;
- }
+ else
+ Thread.yield(); // cannot block
}
}
- return true;
}
/**
@@ -3422,17 +3111,19 @@
throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
+ WorkQueue w;
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
- (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
- WorkQueue w = wt.workQueue;
+ (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
+ (w = wt.workQueue) != null) {
+ int block;
while (!blocker.isReleasable()) {
- if (p.tryCompensate(w)) {
+ if ((block = p.tryCompensate(w)) != 0) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
- U.getAndAddLong(p, CTL, AC_UNIT);
+ CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
}
break;
}
@@ -3444,6 +3135,55 @@
}
}
+ /**
+ * If the given executor is a ForkJoinPool, poll and execute
+ * AsynchronousCompletionTasks from worker's queue until none are
+ * available or blocker is released.
+ */
+ static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
+ if (blocker != null && (e instanceof ForkJoinPool)) {
+ WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
+ ForkJoinPool p = (ForkJoinPool)e;
+ Thread thread = Thread.currentThread();
+ if (thread instanceof ForkJoinWorkerThread &&
+ (wt = (ForkJoinWorkerThread)thread).pool == p)
+ w = wt.workQueue;
+ else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
+ (ws = p.workQueues) != null && (n = ws.length) > 0)
+ w = ws[(n - 1) & r & SQMASK];
+ else
+ w = null;
+ if (w != null) {
+ for (;;) {
+ int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
+ if ((a = w.array) != null && (d = b - s) < 0 &&
+ (al = a.length) > 0) {
+ int index = (al - 1) & b;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ QA.getAcquire(a, index);
+ if (blocker.isReleasable())
+ break;
+ else if (b++ == w.base) {
+ if (t == null) {
+ if (d == -1)
+ break;
+ }
+ else if (!(t instanceof CompletableFuture.
+ AsynchronousCompletionTask))
+ break;
+ else if (QA.compareAndSet(a, index, t, null)) {
+ w.base = b;
+ t.doExec();
+ }
+ }
+ }
+ else
+ break;
+ }
+ }
+ }
+ }
+
// AbstractExecutorService overrides. These rely on undocumented
// fact that ForkJoinTask.adapt returns ForkJoinTasks that also
// implement RunnableFuture.
@@ -3456,24 +3196,17 @@
return new ForkJoinTask.AdaptedCallable<T>(callable);
}
- // Unsafe mechanics
- private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
- private static final long CTL;
- private static final long RUNSTATE;
- private static final int ABASE;
- private static final int ASHIFT;
+ // VarHandle mechanics
+ private static final VarHandle CTL;
+ private static final VarHandle MODE;
+ private static final VarHandle QA;
static {
try {
- CTL = U.objectFieldOffset
- (ForkJoinPool.class.getDeclaredField("ctl"));
- RUNSTATE = U.objectFieldOffset
- (ForkJoinPool.class.getDeclaredField("runState"));
- ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
- int scale = U.arrayIndexScale(ForkJoinTask[].class);
- if ((scale & (scale - 1)) != 0)
- throw new Error("array index scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
+ MethodHandles.Lookup l = MethodHandles.lookup();
+ CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
+ MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
+ QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
@@ -3497,51 +3230,10 @@
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
- public ForkJoinPool run() { return makeCommonPool(); }});
-
- // report 1 even if threads disabled
- COMMON_PARALLELISM = Math.max(common.config & SMASK, 1);
- }
+ public ForkJoinPool run() {
+ return new ForkJoinPool((byte)0); }});
- /**
- * Creates and returns the common pool, respecting user settings
- * specified via system properties.
- */
- @SuppressWarnings("deprecation") // Class.newInstance
- static ForkJoinPool makeCommonPool() {
- int parallelism = -1;
- ForkJoinWorkerThreadFactory factory = null;
- UncaughtExceptionHandler handler = null;
- try { // ignore exceptions in accessing/parsing properties
- String pp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.parallelism");
- String fp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.threadFactory");
- String hp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
- if (pp != null)
- parallelism = Integer.parseInt(pp);
- if (fp != null)
- factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
- getSystemClassLoader().loadClass(fp).newInstance());
- if (hp != null)
- handler = ((UncaughtExceptionHandler)ClassLoader.
- getSystemClassLoader().loadClass(hp).newInstance());
- } catch (Exception ignore) {
- }
- if (factory == null) {
- if (System.getSecurityManager() == null)
- factory = defaultForkJoinWorkerThreadFactory;
- else // use security-managed default
- factory = new InnocuousForkJoinWorkerThreadFactory();
- }
- if (parallelism < 0 && // default 1 less than #cores
- (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
- parallelism = 1;
- if (parallelism > MAX_CAP)
- parallelism = MAX_CAP;
- return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
- "ForkJoinPool.commonPool-worker-");
+ COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
}
/**
--- a/jdk/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java Fri Jul 15 13:51:43 2016 -0700
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java Fri Jul 15 13:55:51 2016 -0700
@@ -35,6 +35,8 @@
package java.util.concurrent;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
@@ -866,7 +868,7 @@
/** Subscriber for method consume */
private static final class ConsumerSubscriber<T>
- implements Flow.Subscriber<T> {
+ implements Flow.Subscriber<T> {
final CompletableFuture<Void> status;
final Consumer<? super T> consumer;
Flow.Subscription subscription;
@@ -906,7 +908,7 @@
*/
@SuppressWarnings("serial")
static final class ConsumerTask<T> extends ForkJoinTask<Void>
- implements Runnable {
+ implements Runnable, CompletableFuture.AsynchronousCompletionTask {
final BufferedSubscription<T> consumer;
ConsumerTask(BufferedSubscription<T> consumer) {
this.consumer = consumer;
@@ -959,11 +961,9 @@
* Blocking control relies on the "waiter" field. Producers set
* the field before trying to block, but must then recheck (via
* offer) before parking. Signalling then just unparks and clears
- * waiter field. If the producer and consumer are both in the same
- * ForkJoinPool, or consumers are running in commonPool, the
- * producer attempts to help run consumer tasks that it forked
- * before blocking. To avoid potential cycles, only one level of
- * helping is currently supported.
+ * waiter field. If the producer and/or consumer are using a
+ * ForkJoinPool, the producer attempts to help run consumer tasks
+ * via ForkJoinPool.helpAsyncBlocker before blocking.
*
* This class uses @Contended and heuristic field declaration
* ordering to reduce false-sharing-based memory contention among
@@ -983,7 +983,6 @@
volatile long demand; // # unfilled requests
int maxCapacity; // reduced on OOME
int putStat; // offer result for ManagedBlocker
- int helpDepth; // nested helping depth (at most 1)
volatile int ctl; // atomic run state flags
volatile int head; // next position to take
int tail; // next position to put
@@ -1077,7 +1076,7 @@
alloc = true;
}
else {
- U.fullFence(); // recheck
+ VarHandle.fullFence(); // recheck
int h = head, t = tail, size = t + 1 - h;
if (cap >= size) {
a[(cap - 1) & t] = item;
@@ -1116,10 +1115,10 @@
if (a != null && cap > 0) {
int mask = cap - 1;
for (int j = head; j != t; ++j) {
- long k = ((long)(j & mask) << ASHIFT) + ABASE;
- Object x = U.getObjectVolatile(a, k);
+ int k = j & mask;
+ Object x = QA.getAcquire(a, k);
if (x != null && // races with consumer
- U.compareAndSwapObject(a, k, x, null))
+ QA.compareAndSet(a, k, x, null))
newArray[j & newMask] = x;
}
}
@@ -1136,28 +1135,20 @@
* initial offer return 0.
*/
final int submit(T item) {
- int stat; Executor e; ForkJoinWorkerThread w;
- if ((stat = offer(item)) == 0 && helpDepth == 0 &&
- ((e = executor) instanceof ForkJoinPool)) {
- helpDepth = 1;
- Thread thread = Thread.currentThread();
- if ((thread instanceof ForkJoinWorkerThread) &&
- ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
- stat = internalHelpConsume(w.workQueue, item);
- else if (e == ForkJoinPool.commonPool())
- stat = externalHelpConsume
- (ForkJoinPool.commonSubmitterQueue(), item);
- helpDepth = 0;
- }
- if (stat == 0 && (stat = offer(item)) == 0) {
+ int stat;
+ if ((stat = offer(item)) == 0) {
putItem = item;
timeout = 0L;
- try {
- ForkJoinPool.managedBlock(this);
- } catch (InterruptedException ie) {
- timeout = INTERRUPTED;
+ putStat = 0;
+ ForkJoinPool.helpAsyncBlocker(executor, this);
+ if ((stat = putStat) == 0) {
+ try {
+ ForkJoinPool.managedBlock(this);
+ } catch (InterruptedException ie) {
+ timeout = INTERRUPTED;
+ }
+ stat = putStat;
}
- stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
@@ -1165,71 +1156,22 @@
}
/**
- * Tries helping for FJ submitter.
- */
- private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
- int stat = 0;
- if (w != null) {
- ForkJoinTask<?> t;
- while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
- if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
- break;
- ((ConsumerTask<?>)t).consumer.consume();
- }
- }
- return stat;
- }
-
- /**
- * Tries helping for non-FJ submitter.
- */
- private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
- int stat = 0;
- if (w != null) {
- ForkJoinTask<?> t;
- while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
- if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
- break;
- ((ConsumerTask<?>)t).consumer.consume();
- }
- }
- return stat;
- }
-
- /**
* Timeout version; similar to submit.
*/
final int timedOffer(T item, long nanos) {
- int stat; Executor e;
- if ((stat = offer(item)) == 0 && helpDepth == 0 &&
- ((e = executor) instanceof ForkJoinPool)) {
- Thread thread = Thread.currentThread();
- if (((thread instanceof ForkJoinWorkerThread) &&
- ((ForkJoinWorkerThread)thread).getPool() == e) ||
- e == ForkJoinPool.commonPool()) {
- helpDepth = 1;
- ForkJoinTask<?> t;
- long deadline = System.nanoTime() + nanos;
- while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
- (t instanceof ConsumerTask)) {
- if ((stat = offer(item)) != 0 ||
- (nanos = deadline - System.nanoTime()) <= 0L ||
- !t.tryUnfork())
- break;
- ((ConsumerTask<?>)t).consumer.consume();
+ int stat;
+ if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
+ putItem = item;
+ putStat = 0;
+ ForkJoinPool.helpAsyncBlocker(executor, this);
+ if ((stat = putStat) == 0) {
+ try {
+ ForkJoinPool.managedBlock(this);
+ } catch (InterruptedException ie) {
+ timeout = INTERRUPTED;
}
- helpDepth = 0;
+ stat = putStat;
}
- }
- if (stat == 0 && (stat = offer(item)) == 0 &&
- (timeout = nanos) > 0L) {
- putItem = item;
- try {
- ForkJoinPool.managedBlock(this);
- } catch (InterruptedException ie) {
- timeout = INTERRUPTED;
- }
- stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
@@ -1249,22 +1191,20 @@
}
else if ((c & ACTIVE) != 0) { // ensure keep-alive
if ((c & CONSUME) != 0 ||
- U.compareAndSwapInt(this, CTL, c,
- c | CONSUME))
+ CTL.compareAndSet(this, c, c | CONSUME))
break;
}
else if (demand == 0L || tail == head)
break;
- else if (U.compareAndSwapInt(this, CTL, c,
- c | (ACTIVE | CONSUME))) {
+ else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
try {
e.execute(new ConsumerTask<T>(this));
break;
} catch (RuntimeException | Error ex) { // back out
do {} while (((c = ctl) & DISABLED) == 0 &&
(c & ACTIVE) != 0 &&
- !U.compareAndSwapInt(this, CTL, c,
- c & ~ACTIVE));
+ !CTL.weakCompareAndSetVolatile
+ (this, c, c & ~ACTIVE));
throw ex;
}
}
@@ -1300,10 +1240,10 @@
break;
else if ((c & ACTIVE) != 0) {
pendingError = ex;
- if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
+ if (CTL.compareAndSet(this, c, c | ERROR))
break; // cause consumer task to exit
}
- else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
+ else if (CTL.compareAndSet(this, c, DISABLED)) {
Flow.Subscriber<? super T> s = subscriber;
if (s != null && ex != null) {
try {
@@ -1330,7 +1270,7 @@
for (int c;;) {
if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
break;
- if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
+ if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
onError(ex);
break;
}
@@ -1343,8 +1283,8 @@
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
- if (U.compareAndSwapInt(this, CTL, c,
- c | (ACTIVE | CONSUME | COMPLETE))) {
+ if (CTL.compareAndSet(this, c,
+ c | (ACTIVE | CONSUME | COMPLETE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
@@ -1356,8 +1296,8 @@
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
- if (U.compareAndSwapInt(this, CTL, c,
- c | (ACTIVE | CONSUME | SUBSCRIBE))) {
+ if (CTL.compareAndSet(this, c,
+ c | (ACTIVE | CONSUME | SUBSCRIBE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
@@ -1375,11 +1315,11 @@
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
- if (U.compareAndSwapInt(this, CTL, c,
- c | (CONSUME | ERROR)))
+ if (CTL.compareAndSet(this, c,
+ c | (CONSUME | ERROR)))
break;
}
- else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
+ else if (CTL.compareAndSet(this, c, DISABLED)) {
detach();
break;
}
@@ -1395,19 +1335,18 @@
long prev = demand, d;
if ((d = prev + n) < prev) // saturate
d = Long.MAX_VALUE;
- if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
+ if (DEMAND.compareAndSet(this, prev, d)) {
for (int c, h;;) {
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
if ((c & CONSUME) != 0 ||
- U.compareAndSwapInt(this, CTL, c,
- c | CONSUME))
+ CTL.compareAndSet(this, c, c | CONSUME))
break;
}
else if ((h = head) != tail) {
- if (U.compareAndSwapInt(this, CTL, c,
- c | (ACTIVE|CONSUME))) {
+ if (CTL.compareAndSet(this, c,
+ c | (ACTIVE|CONSUME))) {
startOrDisable();
break;
}
@@ -1476,16 +1415,14 @@
if ((s = subscriber) != null) { // else disabled
for (;;) {
long d = demand;
- int c; Object[] a; int n; long i; Object x; Thread w;
+ int c; Object[] a; int n, i; Object x; Thread w;
if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
if (!checkControl(s, c))
break;
}
else if ((a = array) == null || h == tail ||
(n = a.length) == 0 ||
- (x = U.getObjectVolatile
- (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
- == null) {
+ (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
if (!checkEmpty(s, c))
break;
}
@@ -1494,10 +1431,10 @@
break;
}
else if (((c & CONSUME) != 0 ||
- U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
- U.compareAndSwapObject(a, i, x, null)) {
- U.putIntRelease(this, HEAD, ++h);
- U.getAndAddLong(this, DEMAND, -1L);
+ CTL.compareAndSet(this, c, c | CONSUME)) &&
+ QA.compareAndSet(a, i, x, null)) {
+ HEAD.setRelease(this, ++h);
+ DEMAND.getAndAdd(this, -1L);
if ((w = waiter) != null)
signalWaiter(w);
try {
@@ -1528,7 +1465,7 @@
}
}
else if ((c & SUBSCRIBE) != 0) {
- if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
+ if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
try {
if (s != null)
s.onSubscribe(this);
@@ -1551,9 +1488,9 @@
boolean stat = true;
if (head == tail) {
if ((c & CONSUME) != 0)
- U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
+ CTL.compareAndSet(this, c, c & ~CONSUME);
else if ((c & COMPLETE) != 0) {
- if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
+ if (CTL.compareAndSet(this, c, DISABLED)) {
try {
if (s != null)
s.onComplete();
@@ -1561,7 +1498,7 @@
}
}
}
- else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
+ else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
stat = false;
}
return stat;
@@ -1574,8 +1511,8 @@
boolean stat = true;
if (demand == 0L) {
if ((c & CONSUME) != 0)
- U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
- else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
+ CTL.compareAndSet(this, c, c & ~CONSUME);
+ else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
stat = false;
}
return stat;
@@ -1595,31 +1532,25 @@
onError(ex);
}
- // Unsafe mechanics
- private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
- private static final long CTL;
- private static final long TAIL;
- private static final long HEAD;
- private static final long DEMAND;
- private static final int ABASE;
- private static final int ASHIFT;
+ // VarHandle mechanics
+ private static final VarHandle CTL;
+ private static final VarHandle TAIL;
+ private static final VarHandle HEAD;
+ private static final VarHandle DEMAND;
+ private static final VarHandle QA;
static {
try {
- CTL = U.objectFieldOffset
- (BufferedSubscription.class.getDeclaredField("ctl"));
- TAIL = U.objectFieldOffset
- (BufferedSubscription.class.getDeclaredField("tail"));
- HEAD = U.objectFieldOffset
- (BufferedSubscription.class.getDeclaredField("head"));
- DEMAND = U.objectFieldOffset
- (BufferedSubscription.class.getDeclaredField("demand"));
-
- ABASE = U.arrayBaseOffset(Object[].class);
- int scale = U.arrayIndexScale(Object[].class);
- if ((scale & (scale - 1)) != 0)
- throw new Error("data type scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
+ MethodHandles.Lookup l = MethodHandles.lookup();
+ CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
+ int.class);
+ TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
+ int.class);
+ HEAD = l.findVarHandle(BufferedSubscription.class, "head",
+ int.class);
+ DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
+ long.class);
+ QA = MethodHandles.arrayElementVarHandle(Object[].class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}