6978087: jsr166y Updates
Summary: Simplify the ForkJoinPool API, reworking some of the internals
Reviewed-by: martin, dholmes, chegar
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Mon Sep 13 09:55:03 2010 +0100
@@ -40,16 +40,23 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.locks.Condition;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
- * from non-{@code ForkJoinTask}s, as well as management and
+ * from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
@@ -58,29 +65,19 @@
* execute subtasks created by other active tasks (eventually blocking
* waiting for work if none exist). This enables efficient processing
* when most tasks spawn other subtasks (as do most {@code
- * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
- * execution of some plain {@code Runnable}- or {@code Callable}-
- * based activities along with {@code ForkJoinTask}s. When setting
- * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
- * also be appropriate for use with fine-grained tasks of any form
- * that are never joined. Otherwise, other {@code ExecutorService}
- * implementations are typically more appropriate choices.
+ * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
+ * constructors, {@code ForkJoinPool}s may also be appropriate for use
+ * with event-style tasks that are never joined.
*
* <p>A {@code ForkJoinPool} is constructed with a given target
* parallelism level; by default, equal to the number of available
- * processors. Unless configured otherwise via {@link
- * #setMaintainsParallelism}, the pool attempts to maintain this
- * number of active (or available) threads by dynamically adding,
- * suspending, or resuming internal worker threads, even if some tasks
- * are stalled waiting to join others. However, no such adjustments
- * are performed in the face of blocked IO or other unmanaged
- * synchronization. The nested {@link ManagedBlocker} interface
- * enables extension of the kinds of synchronization accommodated.
- * The target parallelism level may also be changed dynamically
- * ({@link #setParallelism}). The total number of threads may be
- * limited using method {@link #setMaximumPoolSize}, in which case it
- * may become possible for the activities of a pool to stall due to
- * the lack of available threads to process new tasks.
+ * processors. The pool attempts to maintain enough active (or
+ * available) threads by dynamically adding, suspending, or resuming
+ * internal worker threads, even if some tasks are stalled waiting to
+ * join others. However, no such adjustments are guaranteed in the
+ * face of blocked IO or other unmanaged synchronization. The nested
+ * {@link ManagedBlocker} interface enables extension of the kinds of
+ * synchronization accommodated.
*
* <p>In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
@@ -89,6 +86,40 @@
* {@link #toString} returns indications of pool state in a
* convenient form for informal monitoring.
*
+ * <p> As is the case with other ExecutorServices, there are three
+ * main task execution methods summarized in the following
+ * table. These are designed to be used by clients not already engaged
+ * in fork/join computations in the current pool. The main forms of
+ * these methods accept instances of {@code ForkJoinTask}, but
+ * overloaded forms also allow mixed execution of plain {@code
+ * Runnable}- or {@code Callable}- based activities as well. However,
+ * tasks that are already executing in a pool should normally
+ * <em>NOT</em> use these pool execution methods, but instead use the
+ * within-computation forms listed in the table.
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
+ * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
+ * </tr>
+ * <tr>
+ * <td> <b>Arrange async execution</td>
+ * <td> {@link #execute(ForkJoinTask)}</td>
+ * <td> {@link ForkJoinTask#fork}</td>
+ * </tr>
+ * <tr>
+ * <td> <b>Await and obtain result</td>
+ * <td> {@link #invoke(ForkJoinTask)}</td>
+ * <td> {@link ForkJoinTask#invoke}</td>
+ * </tr>
+ * <tr>
+ * <td> <b>Arrange exec and obtain Future</td>
+ * <td> {@link #submit(ForkJoinTask)}</td>
+ * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
+ * </tr>
+ * </table>
+ *
* <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
* used for all parallel task execution in a program or subsystem.
* Otherwise, use would not usually outweigh the construction and
@@ -113,7 +144,8 @@
* {@code IllegalArgumentException}.
*
* <p>This implementation rejects submitted tasks (that is, by throwing
- * {@link RejectedExecutionException}) only when the pool is shut down.
+ * {@link RejectedExecutionException}) only when the pool is shut down
+ * or internal resources have been exhausted.
*
* @since 1.7
* @author Doug Lea
@@ -121,16 +153,247 @@
public class ForkJoinPool extends AbstractExecutorService {
/*
- * See the extended comments interspersed below for design,
- * rationale, and walkthroughs.
+ * Implementation Overview
+ *
+ * This class provides the central bookkeeping and control for a
+ * set of worker threads: Submissions from non-FJ threads enter
+ * into a submission queue. Workers take these tasks and typically
+ * split them into subtasks that may be stolen by other workers.
+ * The main work-stealing mechanics implemented in class
+ * ForkJoinWorkerThread give first priority to processing tasks
+ * from their own queues (LIFO or FIFO, depending on mode), then
+ * to randomized FIFO steals of tasks in other worker queues, and
+ * lastly to new submissions. These mechanics do not consider
+ * affinities, loads, cache localities, etc, so rarely provide the
+ * best possible performance on a given machine, but portably
+ * provide good throughput by averaging over these factors.
+ * (Further, even if we did try to use such information, we do not
+ * usually have a basis for exploiting it. For example, some sets
+ * of tasks profit from cache affinities, but others are harmed by
+ * cache pollution effects.)
+ *
+ * Beyond work-stealing support and essential bookkeeping, the
+ * main responsibility of this framework is to take actions when
+ * one worker is waiting to join a task stolen (or always held by)
+ * another. Because we are multiplexing many tasks on to a pool
+ * of workers, we can't just let them block (as in Thread.join).
+ * We also cannot just reassign the joiner's run-time stack with
+ * another and replace it later, which would be a form of
+ * "continuation", that even if possible is not necessarily a good
+ * idea. Given that the creation costs of most threads on most
+ * systems mainly surrounds setting up runtime stacks, thread
+ * creation and switching is usually not much more expensive than
+ * stack creation and switching, and is more flexible). Instead we
+ * combine two tactics:
+ *
+ * Helping: Arranging for the joiner to execute some task that it
+ * would be running if the steal had not occurred. Method
+ * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
+ * links to try to find such a task.
+ *
+ * Compensating: Unless there are already enough live threads,
+ * method helpMaintainParallelism() may create or
+ * re-activate a spare thread to compensate for blocked
+ * joiners until they unblock.
+ *
+ * It is impossible to keep exactly the target (parallelism)
+ * number of threads running at any given time. Determining
+ * existence of conservatively safe helping targets, the
+ * availability of already-created spares, and the apparent need
+ * to create new spares are all racy and require heuristic
+ * guidance, so we rely on multiple retries of each. Compensation
+ * occurs in slow-motion. It is triggered only upon timeouts of
+ * Object.wait used for joins. This reduces poor decisions that
+ * would otherwise be made when threads are waiting for others
+ * that are stalled because of unrelated activities such as
+ * garbage collection.
+ *
+ * The ManagedBlocker extension API can't use helping so relies
+ * only on compensation in method awaitBlocker.
+ *
+ * The main throughput advantages of work-stealing stem from
+ * decentralized control -- workers mostly steal tasks from each
+ * other. We do not want to negate this by creating bottlenecks
+ * implementing other management responsibilities. So we use a
+ * collection of techniques that avoid, reduce, or cope well with
+ * contention. These entail several instances of bit-packing into
+ * CASable fields to maintain only the minimally required
+ * atomicity. To enable such packing, we restrict maximum
+ * parallelism to (1<<15)-1 (enabling twice this (to accommodate
+ * unbalanced increments and decrements) to fit into a 16 bit
+ * field, which is far in excess of normal operating range. Even
+ * though updates to some of these bookkeeping fields do sometimes
+ * contend with each other, they don't normally cache-contend with
+ * updates to others enough to warrant memory padding or
+ * isolation. So they are all held as fields of ForkJoinPool
+ * objects. The main capabilities are as follows:
+ *
+ * 1. Creating and removing workers. Workers are recorded in the
+ * "workers" array. This is an array as opposed to some other data
+ * structure to support index-based random steals by workers.
+ * Updates to the array recording new workers and unrecording
+ * terminated ones are protected from each other by a lock
+ * (workerLock) but the array is otherwise concurrently readable,
+ * and accessed directly by workers. To simplify index-based
+ * operations, the array size is always a power of two, and all
+ * readers must tolerate null slots. Currently, all worker thread
+ * creation is on-demand, triggered by task submissions,
+ * replacement of terminated workers, and/or compensation for
+ * blocked workers. However, all other support code is set up to
+ * work with other policies.
+ *
+ * To ensure that we do not hold on to worker references that
+ * would prevent GC, ALL accesses to workers are via indices into
+ * the workers array (which is one source of some of the unusual
+ * code constructions here). In essence, the workers array serves
+ * as a WeakReference mechanism. Thus for example the event queue
+ * stores worker indices, not worker references. Access to the
+ * workers in associated methods (for example releaseEventWaiters)
+ * must both index-check and null-check the IDs. All such accesses
+ * ignore bad IDs by returning out early from what they are doing,
+ * since this can only be associated with shutdown, in which case
+ * it is OK to give up. On termination, we just clobber these
+ * data structures without trying to use them.
+ *
+ * 2. Bookkeeping for dynamically adding and removing workers. We
+ * aim to approximately maintain the given level of parallelism.
+ * When some workers are known to be blocked (on joins or via
+ * ManagedBlocker), we may create or resume others to take their
+ * place until they unblock (see below). Implementing this
+ * requires counts of the number of "running" threads (i.e., those
+ * that are neither blocked nor artificially suspended) as well as
+ * the total number. These two values are packed into one field,
+ * "workerCounts" because we need accurate snapshots when deciding
+ * to create, resume or suspend. Note however that the
+ * correspondence of these counts to reality is not guaranteed. In
+ * particular updates for unblocked threads may lag until they
+ * actually wake up.
+ *
+ * 3. Maintaining global run state. The run state of the pool
+ * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
+ * those in other Executor implementations, as well as a count of
+ * "active" workers -- those that are, or soon will be, or
+ * recently were executing tasks. The runLevel and active count
+ * are packed together in order to correctly trigger shutdown and
+ * termination. Without care, active counts can be subject to very
+ * high contention. We substantially reduce this contention by
+ * relaxing update rules. A worker must claim active status
+ * prospectively, by activating if it sees that a submitted or
+ * stealable task exists (it may find after activating that the
+ * task no longer exists). It stays active while processing this
+ * task (if it exists) and any other local subtasks it produces,
+ * until it cannot find any other tasks. It then tries
+ * inactivating (see method preStep), but upon update contention
+ * instead scans for more tasks, later retrying inactivation if it
+ * doesn't find any.
+ *
+ * 4. Managing idle workers waiting for tasks. We cannot let
+ * workers spin indefinitely scanning for tasks when none are
+ * available. On the other hand, we must quickly prod them into
+ * action when new tasks are submitted or generated. We
+ * park/unpark these idle workers using an event-count scheme.
+ * Field eventCount is incremented upon events that may enable
+ * workers that previously could not find a task to now find one:
+ * Submission of a new task to the pool, or another worker pushing
+ * a task onto a previously empty queue. (We also use this
+ * mechanism for configuration and termination actions that
+ * require wakeups of idle workers). Each worker maintains its
+ * last known event count, and blocks when a scan for work did not
+ * find a task AND its lastEventCount matches the current
+ * eventCount. Waiting idle workers are recorded in a variant of
+ * Treiber stack headed by field eventWaiters which, when nonzero,
+ * encodes the thread index and count awaited for by the worker
+ * thread most recently calling eventSync. This thread in turn has
+ * a record (field nextEventWaiter) for the next waiting worker.
+ * In addition to allowing simpler decisions about need for
+ * wakeup, the event count bits in eventWaiters serve the role of
+ * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
+ * released threads also try to release at most two others. The
+ * net effect is a tree-like diffusion of signals, where released
+ * threads (and possibly others) help with unparks. To further
+ * reduce contention effects a bit, failed CASes to increment
+ * field eventCount are tolerated without retries in signalWork.
+ * Conceptually they are merged into the same event, which is OK
+ * when their only purpose is to enable workers to scan for work.
+ *
+ * 5. Managing suspension of extra workers. When a worker notices
+ * (usually upon timeout of a wait()) that there are too few
+ * running threads, we may create a new thread to maintain
+ * parallelism level, or at least avoid starvation. Usually, extra
+ * threads are needed for only very short periods, yet join
+ * dependencies are such that we sometimes need them in
+ * bursts. Rather than create new threads each time this happens,
+ * we suspend no-longer-needed extra ones as "spares". For most
+ * purposes, we don't distinguish "extra" spare threads from
+ * normal "core" threads: On each call to preStep (the only point
+ * at which we can do this) a worker checks to see if there are
+ * now too many running workers, and if so, suspends itself.
+ * Method helpMaintainParallelism looks for suspended threads to
+ * resume before considering creating a new replacement. The
+ * spares themselves are encoded on another variant of a Treiber
+ * Stack, headed at field "spareWaiters". Note that the use of
+ * spares is intrinsically racy. One thread may become a spare at
+ * about the same time as another is needlessly being created. We
+ * counteract this and related slop in part by requiring resumed
+ * spares to immediately recheck (in preStep) to see whether they
+ * should re-suspend.
+ *
+ * 6. Killing off unneeded workers. A timeout mechanism is used to
+ * shed unused workers: The oldest (first) event queue waiter uses
+ * a timed rather than hard wait. When this wait times out without
+ * a normal wakeup, it tries to shutdown any one (for convenience
+ * the newest) other spare or event waiter via
+ * tryShutdownUnusedWorker. This eventually reduces the number of
+ * worker threads to a minimum of one after a long enough period
+ * without use.
+ *
+ * 7. Deciding when to create new workers. The main dynamic
+ * control in this class is deciding when to create extra threads
+ * in method helpMaintainParallelism. We would like to keep
+ * exactly #parallelism threads running, which is an impossible
+ * task. We always need to create one when the number of running
+ * threads would become zero and all workers are busy. Beyond
+ * this, we must rely on heuristics that work well in the
+ * presence of transient phenomena such as GC stalls, dynamic
+ * compilation, and wake-up lags. These transients are extremely
+ * common -- we are normally trying to fully saturate the CPUs on
+ * a machine, so almost any activity other than running tasks
+ * impedes accuracy. Our main defense is to allow parallelism to
+ * lapse for a while during joins, and use a timeout to see if,
+ * after the resulting settling, there is still a need for
+ * additional workers. This also better copes with the fact that
+ * some of the methods in this class tend to never become compiled
+ * (but are interpreted), so some components of the entire set of
+ * controls might execute 100 times faster than others. And
+ * similarly for cases where the apparent lack of work is just due
+ * to GC stalls and other transient system activity.
+ *
+ * Beware that there is a lot of representation-level coupling
+ * among classes ForkJoinPool, ForkJoinWorkerThread, and
+ * ForkJoinTask. For example, direct access to "workers" array by
+ * workers, and direct access to ForkJoinTask.status by both
+ * ForkJoinPool and ForkJoinWorkerThread. There is little point
+ * trying to reduce this, since any associated future changes in
+ * representations will need to be accompanied by algorithmic
+ * changes anyway.
+ *
+ * Style notes: There are lots of inline assignments (of form
+ * "while ((local = field) != 0)") which are usually the simplest
+ * way to ensure the required read orderings (which are sometimes
+ * critical). Also several occurrences of the unusual "do {}
+ * while (!cas...)" which is the simplest way to force an update of
+ * a CAS'ed variable. There are also other coding oddities that
+ * help some methods perform reasonably even when interpreted (not
+ * compiled), at the expense of some messy constructions that
+ * reduce byte code counts.
+ *
+ * The order of declarations in this file is: (1) statics (2)
+ * fields (along with constants used when unpacking some of them)
+ * (3) internal control methods (4) callbacks and other support
+ * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
+ * methods (plus a few little helpers).
*/
- /** Mask for packing and unpacking shorts */
- private static final int shortMask = 0xffff;
-
- /** Max pool size -- must be a power of two minus 1 */
- private static final int MAX_THREADS = 0x7FFF;
-
/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
@@ -151,14 +414,10 @@
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
- static class DefaultForkJoinWorkerThreadFactory
+ static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
- try {
- return new ForkJoinWorkerThread(pool);
- } catch (OutOfMemoryError oom) {
- return null;
- }
+ return new ForkJoinWorkerThread(pool);
}
}
@@ -194,47 +453,44 @@
new AtomicInteger();
/**
- * Array holding all worker threads in the pool. Initialized upon
- * first use. Array size must be a power of two. Updates and
- * replacements are protected by workerLock, but it is always kept
- * in a consistent enough state to be randomly accessed without
- * locking by workers performing work-stealing.
+ * The time to block in a join (see awaitJoin) before checking if
+ * a new worker should be (re)started to maintain parallelism
+ * level. The value should be short enough to maintain global
+ * responsiveness and progress but long enough to avoid
+ * counterproductive firings during GC stalls or unrelated system
+ * activity, and to not bog down systems with continual re-firings
+ * on GCs or legitimately long waits.
*/
- volatile ForkJoinWorkerThread[] workers;
-
- /**
- * Lock protecting access to workers.
- */
- private final ReentrantLock workerLock;
-
- /**
- * Condition for awaitTermination.
- */
- private final Condition termination;
+ private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
/**
- * The uncaught exception handler used when any worker
- * abruptly terminates
+ * The wakeup interval (in nanoseconds) for the oldest worker
+ * waiting for an event to invoke tryShutdownUnusedWorker to
+ * shrink the number of workers. The exact value does not matter
+ * too much. It must be short enough to release resources during
+ * sustained periods of idleness, but not so short that threads
+ * are continually re-created.
*/
- private Thread.UncaughtExceptionHandler ueh;
-
- /**
- * Creation factory for worker threads.
- */
- private final ForkJoinWorkerThreadFactory factory;
+ private static final long SHRINK_RATE_NANOS =
+ 30L * 1000L * 1000L * 1000L; // 2 per minute
/**
- * Head of stack of threads that were created to maintain
- * parallelism when other threads blocked, but have since
- * suspended when the parallelism level rose.
+ * Absolute bound for parallelism level. Twice this number plus
+ * one (i.e., 0xfff) must fit into a 16bit field to enable
+ * word-packing for some counts and indices.
*/
- private volatile WaitQueueNode spareStack;
+ private static final int MAX_WORKERS = 0x7fff;
/**
- * Sum of per-thread steal counts, updated only when threads are
- * idle or terminating.
+ * Array holding all worker threads in the pool. Array size must
+ * be a power of two. Updates and replacements are protected by
+ * workerLock, but the array is always kept in a consistent enough
+ * state to be randomly accessed without locking by workers
+ * performing work-stealing, as well as other traversal-based
+ * methods in this class. All readers must tolerate that some
+ * array slots may be null.
*/
- private final AtomicLong stealCount;
+ volatile ForkJoinWorkerThread[] workers;
/**
* Queue for external submissions.
@@ -242,160 +498,732 @@
private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
/**
- * Head of Treiber stack for barrier sync. See below for explanation.
+ * Lock protecting updates to workers array.
+ */
+ private final ReentrantLock workerLock;
+
+ /**
+ * Latch released upon termination.
+ */
+ private final Phaser termination;
+
+ /**
+ * Creation factory for worker threads.
+ */
+ private final ForkJoinWorkerThreadFactory factory;
+
+ /**
+ * Sum of per-thread steal counts, updated only when threads are
+ * idle or terminating.
*/
- private volatile WaitQueueNode syncStack;
+ private volatile long stealCount;
+
+ /**
+ * Encoded record of top of Treiber stack of threads waiting for
+ * events. The top 32 bits contain the count being waited for. The
+ * bottom 16 bits contains one plus the pool index of waiting
+ * worker thread. (Bits 16-31 are unused.)
+ */
+ private volatile long eventWaiters;
+
+ private static final int EVENT_COUNT_SHIFT = 32;
+ private static final long WAITER_ID_MASK = (1L << 16) - 1L;
+
+ /**
+ * A counter for events that may wake up worker threads:
+ * - Submission of a new task to the pool
+ * - A worker pushing a task on an empty queue
+ * - termination
+ */
+ private volatile int eventCount;
+
+ /**
+ * Encoded record of top of Treiber stack of spare threads waiting
+ * for resumption. The top 16 bits contain an arbitrary count to
+ * avoid ABA effects. The bottom 16bits contains one plus the pool
+ * index of waiting worker thread.
+ */
+ private volatile int spareWaiters;
+
+ private static final int SPARE_COUNT_SHIFT = 16;
+ private static final int SPARE_ID_MASK = (1 << 16) - 1;
/**
- * The count for event barrier
+ * Lifecycle control. The low word contains the number of workers
+ * that are (probably) executing tasks. This value is atomically
+ * incremented before a worker gets a task to run, and decremented
+ * when a worker has no tasks and cannot find any. Bits 16-18
+ * contain runLevel value. When all are zero, the pool is
+ * running. Level transitions are monotonic (running -> shutdown
+ * -> terminating -> terminated) so each transition adds a bit.
+ * These are bundled together to ensure consistent read for
+ * termination checks (i.e., that runLevel is at least SHUTDOWN
+ * and active threads is zero).
+ *
+ * Notes: Most direct CASes are dependent on these bitfield
+ * positions. Also, this field is non-private to enable direct
+ * performance-sensitive CASes in ForkJoinWorkerThread.
*/
- private volatile long eventCount;
+ volatile int runState;
+
+ // Note: The order among run level values matters.
+ private static final int RUNLEVEL_SHIFT = 16;
+ private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
+ private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
+ private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
+ private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
+
+ /**
+ * Holds number of total (i.e., created and not yet terminated)
+ * and running (i.e., not blocked on joins or other managed sync)
+ * threads, packed together to ensure consistent snapshot when
+ * making decisions about creating and suspending spare
+ * threads. Updated only by CAS. Note that adding a new worker
+ * requires incrementing both counts, since workers start off in
+ * running state.
+ */
+ private volatile int workerCounts;
+
+ private static final int TOTAL_COUNT_SHIFT = 16;
+ private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
+ private static final int ONE_RUNNING = 1;
+ private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
+
+ /**
+ * The target parallelism level.
+ * Accessed directly by ForkJoinWorkerThreads.
+ */
+ final int parallelism;
+
+ /**
+ * True if use local fifo, not default lifo, for local polling
+ * Read by, and replicated by ForkJoinWorkerThreads
+ */
+ final boolean locallyFifo;
+
+ /**
+ * The uncaught exception handler used when any worker abruptly
+ * terminates.
+ */
+ private final Thread.UncaughtExceptionHandler ueh;
/**
* Pool number, just for assigning useful names to worker threads
*/
private final int poolNumber;
- /**
- * The maximum allowed pool size
- */
- private volatile int maxPoolSize;
-
- /**
- * The desired parallelism level, updated only under workerLock.
- */
- private volatile int parallelism;
-
- /**
- * True if use local fifo, not default lifo, for local polling
- */
- private volatile boolean locallyFifo;
+ // Utilities for CASing fields. Note that most of these
+ // are usually manually inlined by callers
/**
- * Holds number of total (i.e., created and not yet terminated)
- * and running (i.e., not blocked on joins or other managed sync)
- * threads, packed into one int to ensure consistent snapshot when
- * making decisions about creating and suspending spare
- * threads. Updated only by CAS. Note: CASes in
- * updateRunningCount and preJoin assume that running active count
- * is in low word, so need to be modified if this changes.
+ * Increments running count part of workerCounts
*/
- private volatile int workerCounts;
-
- private static int totalCountOf(int s) { return s >>> 16; }
- private static int runningCountOf(int s) { return s & shortMask; }
- private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
-
- /**
- * Adds delta (which may be negative) to running count. This must
- * be called before (with negative arg) and after (with positive)
- * any managed synchronization (i.e., mainly, joins).
- *
- * @param delta the number to add
- */
- final void updateRunningCount(int delta) {
- int s;
- do {} while (!casWorkerCounts(s = workerCounts, s + delta));
+ final void incrementRunningCount() {
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ c = workerCounts,
+ c + ONE_RUNNING));
}
/**
- * Adds delta (which may be negative) to both total and running
- * count. This must be called upon creation and termination of
- * worker threads.
- *
- * @param delta the number to add
+ * Tries to decrement running count unless already zero
*/
- private void updateWorkerCount(int delta) {
- int d = delta + (delta << 16); // add to both lo and hi parts
- int s;
- do {} while (!casWorkerCounts(s = workerCounts, s + d));
+ final boolean tryDecrementRunningCount() {
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) == 0)
+ return false;
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING);
}
/**
- * Lifecycle control. High word contains runState, low word
- * contains the number of workers that are (probably) executing
- * tasks. This value is atomically incremented before a worker
- * gets a task to run, and decremented when worker has no tasks
- * and cannot find any. These two fields are bundled together to
- * support correct termination triggering. Note: activeCount
- * CAS'es cheat by assuming active count is in low word, so need
- * to be modified if this changes
+ * Forces decrement of encoded workerCounts, awaiting nonzero if
+ * (rarely) necessary when other count updates lag.
+ *
+ * @param dr -- either zero or ONE_RUNNING
+ * @param dt -- either zero or ONE_TOTAL
*/
- private volatile int runControl;
-
- // RunState values. Order among values matters
- private static final int RUNNING = 0;
- private static final int SHUTDOWN = 1;
- private static final int TERMINATING = 2;
- private static final int TERMINATED = 3;
-
- private static int runStateOf(int c) { return c >>> 16; }
- private static int activeCountOf(int c) { return c & shortMask; }
- private static int runControlFor(int r, int a) { return (r << 16) + a; }
-
- /**
- * Tries incrementing active count; fails on contention.
- * Called by workers before/during executing tasks.
- *
- * @return true on success
- */
- final boolean tryIncrementActiveCount() {
- int c = runControl;
- return casRunControl(c, c+1);
+ private void decrementWorkerCounts(int dr, int dt) {
+ for (;;) {
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
+ (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
+ if ((runState & TERMINATED) != 0)
+ return; // lagging termination on a backout
+ Thread.yield();
+ }
+ if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - (dr + dt)))
+ return;
+ }
}
/**
* Tries decrementing active count; fails on contention.
- * Possibly triggers termination on success.
- * Called by workers when they can't find tasks.
- *
- * @return true on success
+ * Called when workers cannot find tasks to run.
*/
final boolean tryDecrementActiveCount() {
- int c = runControl;
- int nextc = c - 1;
- if (!casRunControl(c, nextc))
+ int c;
+ return UNSAFE.compareAndSwapInt(this, runStateOffset,
+ c = runState, c - 1);
+ }
+
+ /**
+ * Advances to at least the given level. Returns true if not
+ * already in at least the given level.
+ */
+ private boolean advanceRunLevel(int level) {
+ for (;;) {
+ int s = runState;
+ if ((s & level) != 0)
+ return false;
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
+ return true;
+ }
+ }
+
+ // workers array maintenance
+
+ /**
+ * Records and returns a workers array index for new worker.
+ */
+ private int recordWorker(ForkJoinWorkerThread w) {
+ // Try using slot totalCount-1. If not available, scan and/or resize
+ int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ if (k < 0 || k >= n || ws[k] != null) {
+ for (k = 0; k < n && ws[k] != null; ++k)
+ ;
+ if (k == n)
+ ws = Arrays.copyOf(ws, n << 1);
+ }
+ ws[k] = w;
+ workers = ws; // volatile array write ensures slot visibility
+ } finally {
+ lock.unlock();
+ }
+ return k;
+ }
+
+ /**
+ * Nulls out record of worker in workers array.
+ */
+ private void forgetWorker(ForkJoinWorkerThread w) {
+ int idx = w.poolIndex;
+ // Locking helps method recordWorker avoid unnecessary expansion
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ForkJoinWorkerThread[] ws = workers;
+ if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
+ ws[idx] = null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Final callback from terminating worker. Removes record of
+ * worker from array, and adjusts counts. If pool is shutting
+ * down, tries to complete termination.
+ *
+ * @param w the worker
+ */
+ final void workerTerminated(ForkJoinWorkerThread w) {
+ forgetWorker(w);
+ decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ while (w.stealCount != 0) // collect final count
+ tryAccumulateStealCount(w);
+ tryTerminate(false);
+ }
+
+ // Waiting for and signalling events
+
+ /**
+ * Releases workers blocked on a count not equal to current count.
+ * Normally called after precheck that eventWaiters isn't zero to
+ * avoid wasted array checks. Gives up upon a change in count or
+ * upon releasing two workers, letting others take over.
+ */
+ private void releaseEventWaiters() {
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ long h = eventWaiters;
+ int ec = eventCount;
+ boolean releasedOne = false;
+ ForkJoinWorkerThread w; int id;
+ while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
+ id < n && (w = ws[id]) != null) {
+ if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
+ h, w.nextWaiter)) {
+ LockSupport.unpark(w);
+ if (releasedOne) // exit on second release
+ break;
+ releasedOne = true;
+ }
+ if (eventCount != ec)
+ break;
+ h = eventWaiters;
+ }
+ }
+
+ /**
+ * Tries to advance eventCount and releases waiters. Called only
+ * from workers.
+ */
+ final void signalWork() {
+ int c; // try to increment event count -- CAS failure OK
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
+ if (eventWaiters != 0L)
+ releaseEventWaiters();
+ }
+
+ /**
+ * Adds the given worker to event queue and blocks until
+ * terminating or event count advances from the given value
+ *
+ * @param w the calling worker thread
+ * @param ec the count
+ */
+ private void eventSync(ForkJoinWorkerThread w, int ec) {
+ long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
+ long h;
+ while ((runState < SHUTDOWN || !tryTerminate(false)) &&
+ (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+ (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
+ eventCount == ec) {
+ if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
+ w.nextWaiter = h, nh)) {
+ awaitEvent(w, ec);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Blocks the given worker (that has already been entered as an
+ * event waiter) until terminating or event count advances from
+ * the given value. The oldest (first) waiter uses a timed wait to
+ * occasionally one-by-one shrink the number of workers (to a
+ * minimum of one) if the pool has not been used for extended
+ * periods.
+ *
+ * @param w the calling worker thread
+ * @param ec the count
+ */
+ private void awaitEvent(ForkJoinWorkerThread w, int ec) {
+ while (eventCount == ec) {
+ if (tryAccumulateStealCount(w)) { // transfer while idle
+ boolean untimed = (w.nextWaiter != 0L ||
+ (workerCounts & RUNNING_COUNT_MASK) <= 1);
+ long startTime = untimed? 0 : System.nanoTime();
+ Thread.interrupted(); // clear/ignore interrupt
+ if (eventCount != ec || w.runState != 0 ||
+ runState >= TERMINATING) // recheck after clear
+ break;
+ if (untimed)
+ LockSupport.park(w);
+ else {
+ LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
+ if (eventCount != ec || w.runState != 0 ||
+ runState >= TERMINATING)
+ break;
+ if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
+ tryShutdownUnusedWorker(ec);
+ }
+ }
+ }
+ }
+
+ // Maintaining parallelism
+
+ /**
+ * Pushes worker onto the spare stack.
+ */
+ final void pushSpare(ForkJoinWorkerThread w) {
+ int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
+ do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ w.nextSpare = spareWaiters,ns));
+ }
+
+ /**
+ * Tries (once) to resume a spare if the number of running
+ * threads is less than target.
+ */
+ private void tryResumeSpare() {
+ int sw, id;
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ ForkJoinWorkerThread w;
+ if ((sw = spareWaiters) != 0 &&
+ (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
+ id < n && (w = ws[id]) != null &&
+ (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+ spareWaiters == sw &&
+ UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ sw, w.nextSpare)) {
+ int c; // increment running count before resume
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ if (w.tryUnsuspend())
+ LockSupport.unpark(w);
+ else // back out if w was shutdown
+ decrementWorkerCounts(ONE_RUNNING, 0);
+ }
+ }
+
+ /**
+ * Tries to increase the number of running workers if below target
+ * parallelism: If a spare exists tries to resume it via
+ * tryResumeSpare. Otherwise, if not enough total workers or all
+ * existing workers are busy, adds a new worker. In all cases also
+ * helps wake up releasable workers waiting for work.
+ */
+ private void helpMaintainParallelism() {
+ int pc = parallelism;
+ int wc, rs, tc;
+ while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
+ (rs = runState) < TERMINATING) {
+ if (spareWaiters != 0)
+ tryResumeSpare();
+ else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
+ (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
+ break; // enough total
+ else if (runState == rs && workerCounts == wc &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL))) {
+ ForkJoinWorkerThread w = null;
+ try {
+ w = factory.newThread(this);
+ } finally { // adjust on null or exceptional factory return
+ if (w == null) {
+ decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+ tryTerminate(false); // handle failure during shutdown
+ }
+ }
+ if (w == null)
+ break;
+ w.start(recordWorker(w), ueh);
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
+ int c; // advance event count
+ UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ c = eventCount, c+1);
+ break; // add at most one unless total below target
+ }
+ }
+ }
+ if (eventWaiters != 0L)
+ releaseEventWaiters();
+ }
+
+ /**
+ * Callback from the oldest waiter in awaitEvent waking up after a
+ * period of non-use. If all workers are idle, tries (once) to
+ * shutdown an event waiter or a spare, if one exists. Note that
+ * we don't need CAS or locks here because the method is called
+ * only from one thread occasionally waking (and even misfires are
+ * OK). Note that until the shutdown worker fully terminates,
+ * workerCounts will overestimate total count, which is tolerable.
+ *
+ * @param ec the event count waited on by caller (to abort
+ * attempt if count has since changed).
+ */
+ private void tryShutdownUnusedWorker(int ec) {
+ if (runState == 0 && eventCount == ec) { // only trigger if all idle
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ ForkJoinWorkerThread w = null;
+ boolean shutdown = false;
+ int sw;
+ long h;
+ if ((sw = spareWaiters) != 0) { // prefer killing spares
+ int id = (sw & SPARE_ID_MASK) - 1;
+ if (id >= 0 && id < n && (w = ws[id]) != null &&
+ UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ sw, w.nextSpare))
+ shutdown = true;
+ }
+ else if ((h = eventWaiters) != 0L) {
+ long nh;
+ int id = ((int)(h & WAITER_ID_MASK)) - 1;
+ if (id >= 0 && id < n && (w = ws[id]) != null &&
+ (nh = w.nextWaiter) != 0L && // keep at least one worker
+ UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
+ shutdown = true;
+ }
+ if (w != null && shutdown) {
+ w.shutdown();
+ LockSupport.unpark(w);
+ }
+ }
+ releaseEventWaiters(); // in case of interference
+ }
+
+ /**
+ * Callback from workers invoked upon each top-level action (i.e.,
+ * stealing a task or taking a submission and running it).
+ * Performs one or more of the following:
+ *
+ * 1. If the worker is active and either did not run a task
+ * or there are too many workers, try to set its active status
+ * to inactive and update activeCount. On contention, we may
+ * try again in this or a subsequent call.
+ *
+ * 2. If not enough total workers, help create some.
+ *
+ * 3. If there are too many running workers, suspend this worker
+ * (first forcing inactive if necessary). If it is not needed,
+ * it may be shutdown while suspended (via
+ * tryShutdownUnusedWorker). Otherwise, upon resume it
+ * rechecks running thread count and need for event sync.
+ *
+ * 4. If worker did not run a task, await the next task event via
+ * eventSync if necessary (first forcing inactivation), upon
+ * which the worker may be shutdown via
+ * tryShutdownUnusedWorker. Otherwise, help release any
+ * existing event waiters that are now releasable,
+ *
+ * @param w the worker
+ * @param ran true if worker ran a task since last call to this method
+ */
+ final void preStep(ForkJoinWorkerThread w, boolean ran) {
+ int wec = w.lastEventCount;
+ boolean active = w.active;
+ boolean inactivate = false;
+ int pc = parallelism;
+ int rs;
+ while (w.runState == 0 && (rs = runState) < TERMINATING) {
+ if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
+ UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+ inactivate = active = w.active = false;
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) > pc) {
+ if (!(inactivate |= active) && // must inactivate to suspend
+ workerCounts == wc && // try to suspend as spare
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING))
+ w.suspendAsSpare();
+ }
+ else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
+ helpMaintainParallelism(); // not enough workers
+ else if (!ran) {
+ long h = eventWaiters;
+ int ec = eventCount;
+ if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
+ releaseEventWaiters(); // release others before waiting
+ else if (ec != wec) {
+ w.lastEventCount = ec; // no need to wait
+ break;
+ }
+ else if (!(inactivate |= active))
+ eventSync(w, wec); // must inactivate before sync
+ }
+ else
+ break;
+ }
+ }
+
+ /**
+ * Helps and/or blocks awaiting join of the given task.
+ * See above for explanation.
+ *
+ * @param joinMe the task to join
+ * @param worker the current worker thread
+ */
+ final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
+ int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+ while (joinMe.status >= 0) {
+ int wc;
+ worker.helpJoinTask(joinMe);
+ if (joinMe.status < 0)
+ break;
+ else if (retries > 0)
+ --retries;
+ else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
+ int stat, c; long h;
+ while ((stat = joinMe.status) >= 0 &&
+ (h = eventWaiters) != 0L && // help release others
+ (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ releaseEventWaiters();
+ if (stat >= 0 &&
+ ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
+ (stat =
+ joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
+ helpMaintainParallelism(); // timeout or no running workers
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ if (stat < 0)
+ break; // else restart
+ }
+ }
+ }
+
+ /**
+ * Same idea as awaitJoin, but no helping, retries, or timeouts.
+ */
+ final void awaitBlocker(ManagedBlocker blocker)
+ throws InterruptedException {
+ while (!blocker.isReleasable()) {
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) != 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
+ try {
+ while (!blocker.isReleasable()) {
+ long h = eventWaiters;
+ if (h != 0L &&
+ (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ releaseEventWaiters();
+ else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
+ runState < TERMINATING)
+ helpMaintainParallelism();
+ else if (blocker.block())
+ break;
+ }
+ } finally {
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ }
+ break;
+ }
+ }
+ }
+
+ /**
+ * Possibly initiates and/or completes termination.
+ *
+ * @param now if true, unconditionally terminate, else only
+ * if shutdown and empty queue and no active workers
+ * @return true if now terminating or terminated
+ */
+ private boolean tryTerminate(boolean now) {
+ if (now)
+ advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
+ else if (runState < SHUTDOWN ||
+ !submissionQueue.isEmpty() ||
+ (runState & ACTIVE_COUNT_MASK) != 0)
return false;
- if (canTerminateOnShutdown(nextc))
- terminateOnShutdown();
+
+ if (advanceRunLevel(TERMINATING))
+ startTerminating();
+
+ // Finish now if all threads terminated; else in some subsequent call
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
+ advanceRunLevel(TERMINATED);
+ termination.arrive();
+ }
return true;
}
/**
- * Returns {@code true} if argument represents zero active count
- * and nonzero runstate, which is the triggering condition for
- * terminating on shutdown.
+ * Actions on transition to TERMINATING
+ *
+ * Runs up to four passes through workers: (0) shutting down each
+ * (without waking up if parked) to quickly spread notifications
+ * without unnecessary bouncing around event queues etc (1) wake
+ * up and help cancel tasks (2) interrupt (3) mop up races with
+ * interrupted workers
*/
- private static boolean canTerminateOnShutdown(int c) {
- // i.e. least bit is nonzero runState bit
- return ((c & -c) >>> 16) != 0;
- }
-
- /**
- * Transition run state to at least the given state. Return true
- * if not already at least given state.
- */
- private boolean transitionRunStateTo(int state) {
- for (;;) {
- int c = runControl;
- if (runStateOf(c) >= state)
- return false;
- if (casRunControl(c, runControlFor(state, activeCountOf(c))))
- return true;
+ private void startTerminating() {
+ cancelSubmissions();
+ for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
+ int c; // advance event count
+ UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ c = eventCount, c+1);
+ eventWaiters = 0L; // clobber lists
+ spareWaiters = 0;
+ for (ForkJoinWorkerThread w : workers) {
+ if (w != null) {
+ w.shutdown();
+ if (passes > 0 && !w.isTerminated()) {
+ w.cancelTasks();
+ LockSupport.unpark(w);
+ if (passes > 1) {
+ try {
+ w.interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
+ }
+ }
+ }
}
}
/**
- * Controls whether to add spares to maintain parallelism
+ * Clears out and cancels submissions, ignoring exceptions.
+ */
+ private void cancelSubmissions() {
+ ForkJoinTask<?> task;
+ while ((task = submissionQueue.poll()) != null) {
+ try {
+ task.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
+ // misc support for ForkJoinWorkerThread
+
+ /**
+ * Returns pool number.
+ */
+ final int getPoolNumber() {
+ return poolNumber;
+ }
+
+ /**
+ * Tries to accumulate steal count from a worker, clearing
+ * the worker's value if successful.
+ *
+ * @return true if worker steal count now zero
*/
- private volatile boolean maintainsParallelism;
+ final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
+ int sc = w.stealCount;
+ long c = stealCount;
+ // CAS even if zero, for fence effects
+ if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
+ if (sc != 0)
+ w.stealCount = 0;
+ return true;
+ }
+ return sc == 0;
+ }
+
+ /**
+ * Returns the approximate (non-atomic) number of idle threads per
+ * active thread.
+ */
+ final int idlePerActive() {
+ int pc = parallelism; // use parallelism, not rc
+ int ac = runState; // no mask -- artificially boosts during shutdown
+ // Use exact results for small values, saturate past 4
+ return ((pc <= ac) ? 0 :
+ (pc >>> 1 <= ac) ? 1 :
+ (pc >>> 2 <= ac) ? 3 :
+ pc >>> 3);
+ }
+
+ // Public and protected methods
// Constructors
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
- * java.lang.Runtime#availableProcessors}, and using the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ * java.lang.Runtime#availableProcessors}, using the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory},
+ * no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -404,13 +1232,14 @@
*/
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
- defaultForkJoinWorkerThreadFactory);
+ defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
- * level and using the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ * level, the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory},
+ * no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
@@ -421,31 +1250,25 @@
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism) {
- this(parallelism, defaultForkJoinWorkerThreadFactory);
+ this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
/**
- * Creates a {@code ForkJoinPool} with parallelism equal to {@link
- * java.lang.Runtime#availableProcessors}, and using the given
- * thread factory.
+ * Creates a {@code ForkJoinPool} with the given parameters.
*
- * @param factory the factory for creating new threads
- * @throws NullPointerException if the factory is null
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}{@code ("modifyThread")}
- */
- public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
- this(Runtime.getRuntime().availableProcessors(), factory);
- }
-
- /**
- * Creates a {@code ForkJoinPool} with the given parallelism and
- * thread factory.
- *
- * @param parallelism the parallelism level
- * @param factory the factory for creating new threads
+ * @param parallelism the parallelism level. For default value,
+ * use {@link java.lang.Runtime#availableProcessors}.
+ * @param factory the factory for creating new threads. For default value,
+ * use {@link #defaultForkJoinWorkerThreadFactory}.
+ * @param handler the handler for internal worker threads that
+ * terminate due to unrecoverable errors encountered while executing
+ * tasks. For default value, use {@code null}.
+ * @param asyncMode if true,
+ * establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined. This mode may be more appropriate
+ * than default locally stack-based mode in applications in which
+ * worker threads only process event-style asynchronous tasks.
+ * For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -454,153 +1277,40 @@
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
- public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
- if (parallelism <= 0 || parallelism > MAX_THREADS)
- throw new IllegalArgumentException();
+ public ForkJoinPool(int parallelism,
+ ForkJoinWorkerThreadFactory factory,
+ Thread.UncaughtExceptionHandler handler,
+ boolean asyncMode) {
+ checkPermission();
if (factory == null)
throw new NullPointerException();
- checkPermission();
- this.factory = factory;
+ if (parallelism <= 0 || parallelism > MAX_WORKERS)
+ throw new IllegalArgumentException();
this.parallelism = parallelism;
- this.maxPoolSize = MAX_THREADS;
- this.maintainsParallelism = true;
- this.poolNumber = poolNumberGenerator.incrementAndGet();
- this.workerLock = new ReentrantLock();
- this.termination = workerLock.newCondition();
- this.stealCount = new AtomicLong();
+ this.factory = factory;
+ this.ueh = handler;
+ this.locallyFifo = asyncMode;
+ int arraySize = initialArraySizeFor(parallelism);
+ this.workers = new ForkJoinWorkerThread[arraySize];
this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
- // worker array and workers are lazily constructed
- }
-
- /**
- * Creates a new worker thread using factory.
- *
- * @param index the index to assign worker
- * @return new worker, or null if factory failed
- */
- private ForkJoinWorkerThread createWorker(int index) {
- Thread.UncaughtExceptionHandler h = ueh;
- ForkJoinWorkerThread w = factory.newThread(this);
- if (w != null) {
- w.poolIndex = index;
- w.setDaemon(true);
- w.setAsyncMode(locallyFifo);
- w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
- if (h != null)
- w.setUncaughtExceptionHandler(h);
- }
- return w;
- }
-
- /**
- * Returns a good size for worker array given pool size.
- * Currently requires size to be a power of two.
- */
- private static int arraySizeFor(int poolSize) {
- if (poolSize <= 1)
- return 1;
- // See Hackers Delight, sec 3.2
- int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
- c |= c >>> 1;
- c |= c >>> 2;
- c |= c >>> 4;
- c |= c >>> 8;
- c |= c >>> 16;
- return c + 1;
- }
-
- /**
- * Creates or resizes array if necessary to hold newLength.
- * Call only under exclusion.
- *
- * @return the array
- */
- private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
- ForkJoinWorkerThread[] ws = workers;
- if (ws == null)
- return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
- else if (newLength > ws.length)
- return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
- else
- return ws;
+ this.workerLock = new ReentrantLock();
+ this.termination = new Phaser(1);
+ this.poolNumber = poolNumberGenerator.incrementAndGet();
}
/**
- * Tries to shrink workers into smaller array after one or more terminate.
- */
- private void tryShrinkWorkerArray() {
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- int len = ws.length;
- int last = len - 1;
- while (last >= 0 && ws[last] == null)
- --last;
- int newLength = arraySizeFor(last+1);
- if (newLength < len)
- workers = Arrays.copyOf(ws, newLength);
- }
- }
-
- /**
- * Initializes workers if necessary.
+ * Returns initial power of two size for workers array.
+ * @param pc the initial parallelism level
*/
- final void ensureWorkerInitialization() {
- ForkJoinWorkerThread[] ws = workers;
- if (ws == null) {
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ws = workers;
- if (ws == null) {
- int ps = parallelism;
- ws = ensureWorkerArrayCapacity(ps);
- for (int i = 0; i < ps; ++i) {
- ForkJoinWorkerThread w = createWorker(i);
- if (w != null) {
- ws[i] = w;
- w.start();
- updateWorkerCount(1);
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
- }
-
- /**
- * Worker creation and startup for threads added via setParallelism.
- */
- private void createAndStartAddedWorkers() {
- resumeAllSpares(); // Allow spares to convert to nonspare
- int ps = parallelism;
- ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
- int len = ws.length;
- // Sweep through slots, to keep lowest indices most populated
- int k = 0;
- while (k < len) {
- if (ws[k] != null) {
- ++k;
- continue;
- }
- int s = workerCounts;
- int tc = totalCountOf(s);
- int rc = runningCountOf(s);
- if (rc >= ps || tc >= ps)
- break;
- if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
- ForkJoinWorkerThread w = createWorker(k);
- if (w != null) {
- ws[k++] = w;
- w.start();
- }
- else {
- updateWorkerCount(-1); // back out on failed creation
- break;
- }
- }
- }
+ private static int initialArraySizeFor(int pc) {
+ // If possible, initially allocate enough space for one spare
+ int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
+ // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
+ size |= size >>> 1;
+ size |= size >>> 2;
+ size |= size >>> 4;
+ size |= size >>> 8;
+ return size + 1;
}
// Execution methods
@@ -611,12 +1321,12 @@
private <T> void doSubmit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
- if (isShutdown())
+ if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
- if (workers == null)
- ensureWorkerInitialization();
submissionQueue.offer(task);
- signalIdleWorkers();
+ int c; // try to increment event count -- CAS failure OK
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
+ helpMaintainParallelism(); // create, start, or resume some workers
}
/**
@@ -662,6 +1372,20 @@
}
/**
+ * Submits a ForkJoinTask for execution.
+ *
+ * @param task the task to submit
+ * @return the task
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
+ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
+ doSubmit(task);
+ return task;
+ }
+
+ /**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
@@ -699,21 +1423,6 @@
}
/**
- * Submits a ForkJoinTask for execution.
- *
- * @param task the task to submit
- * @return the task
- * @throws NullPointerException if the task is null
- * @throws RejectedExecutionException if the task cannot be
- * scheduled for execution
- */
- public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- doSubmit(task);
- return task;
- }
-
-
- /**
* @throws NullPointerException {@inheritDoc}
* @throws RejectedExecutionException {@inheritDoc}
*/
@@ -725,7 +1434,7 @@
invoke(new InvokeAll<T>(forkJoinTasks));
@SuppressWarnings({"unchecked", "rawtypes"})
- List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
+ List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
return futures;
}
@@ -739,8 +1448,6 @@
private static final long serialVersionUID = -7914297376763021607L;
}
- // Configuration and status settings and queries
-
/**
* Returns the factory used for constructing new workers.
*
@@ -757,84 +1464,7 @@
* @return the handler, or {@code null} if none
*/
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
- Thread.UncaughtExceptionHandler h;
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- h = ueh;
- } finally {
- lock.unlock();
- }
- return h;
- }
-
- /**
- * Sets the handler for internal worker threads that terminate due
- * to unrecoverable errors encountered while executing tasks.
- * Unless set, the current default or ThreadGroup handler is used
- * as handler.
- *
- * @param h the new handler
- * @return the old handler, or {@code null} if none
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}{@code ("modifyThread")}
- */
- public Thread.UncaughtExceptionHandler
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
- checkPermission();
- Thread.UncaughtExceptionHandler old = null;
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- old = ueh;
- ueh = h;
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null)
- w.setUncaughtExceptionHandler(h);
- }
- }
- } finally {
- lock.unlock();
- }
- return old;
- }
-
-
- /**
- * Sets the target parallelism level of this pool.
- *
- * @param parallelism the target parallelism
- * @throws IllegalArgumentException if parallelism less than or
- * equal to zero or greater than maximum size bounds
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}{@code ("modifyThread")}
- */
- public void setParallelism(int parallelism) {
- checkPermission();
- if (parallelism <= 0 || parallelism > maxPoolSize)
- throw new IllegalArgumentException();
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- if (isProcessingTasks()) {
- int p = this.parallelism;
- this.parallelism = parallelism;
- if (parallelism > p)
- createAndStartAddedWorkers();
- else
- trimSpares();
- }
- } finally {
- lock.unlock();
- }
- signalIdleWorkers();
+ return ueh;
}
/**
@@ -848,92 +1478,14 @@
/**
* Returns the number of worker threads that have started but not
- * yet terminated. This result returned by this method may differ
+ * yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
* @return the number of worker threads
*/
public int getPoolSize() {
- return totalCountOf(workerCounts);
- }
-
- /**
- * Returns the maximum number of threads allowed to exist in the
- * pool. Unless set using {@link #setMaximumPoolSize}, the
- * maximum is an implementation-defined value designed only to
- * prevent runaway growth.
- *
- * @return the maximum
- */
- public int getMaximumPoolSize() {
- return maxPoolSize;
- }
-
- /**
- * Sets the maximum number of threads allowed to exist in the
- * pool. The given value should normally be greater than or equal
- * to the {@link #getParallelism parallelism} level. Setting this
- * value has no effect on current pool size. It controls
- * construction of new threads.
- *
- * @throws IllegalArgumentException if negative or greater than
- * internal implementation limit
- */
- public void setMaximumPoolSize(int newMax) {
- if (newMax < 0 || newMax > MAX_THREADS)
- throw new IllegalArgumentException();
- maxPoolSize = newMax;
- }
-
-
- /**
- * Returns {@code true} if this pool dynamically maintains its
- * target parallelism level. If false, new threads are added only
- * to avoid possible starvation. This setting is by default true.
- *
- * @return {@code true} if maintains parallelism
- */
- public boolean getMaintainsParallelism() {
- return maintainsParallelism;
- }
-
- /**
- * Sets whether this pool dynamically maintains its target
- * parallelism level. If false, new threads are added only to
- * avoid possible starvation.
- *
- * @param enable {@code true} to maintain parallelism
- */
- public void setMaintainsParallelism(boolean enable) {
- maintainsParallelism = enable;
- }
-
- /**
- * Establishes local first-in-first-out scheduling mode for forked
- * tasks that are never joined. This mode may be more appropriate
- * than default locally stack-based mode in applications in which
- * worker threads only process asynchronous tasks. This method is
- * designed to be invoked only when the pool is quiescent, and
- * typically only before any tasks are submitted. The effects of
- * invocations at other times may be unpredictable.
- *
- * @param async if {@code true}, use locally FIFO scheduling
- * @return the previous mode
- * @see #getAsyncMode
- */
- public boolean setAsyncMode(boolean async) {
- boolean oldMode = locallyFifo;
- locallyFifo = async;
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- t.setAsyncMode(async);
- }
- }
- return oldMode;
+ return workerCounts >>> TOTAL_COUNT_SHIFT;
}
/**
@@ -941,7 +1493,6 @@
* scheduling mode for forked tasks that are never joined.
*
* @return {@code true} if this pool uses async mode
- * @see #setAsyncMode
*/
public boolean getAsyncMode() {
return locallyFifo;
@@ -950,12 +1501,13 @@
/**
* Returns an estimate of the number of worker threads that are
* not blocked waiting to join tasks or for other managed
- * synchronization.
+ * synchronization. This method may overestimate the
+ * number of running threads.
*
* @return the number of worker threads
*/
public int getRunningThreadCount() {
- return runningCountOf(workerCounts);
+ return workerCounts & RUNNING_COUNT_MASK;
}
/**
@@ -966,19 +1518,7 @@
* @return the number of active threads
*/
public int getActiveThreadCount() {
- return activeCountOf(runControl);
- }
-
- /**
- * Returns an estimate of the number of threads that are currently
- * idle waiting for tasks. This method may underestimate the
- * number of idle threads.
- *
- * @return the number of idle threads
- */
- final int getIdleThreadCount() {
- int c = runningCountOf(workerCounts) - activeCountOf(runControl);
- return (c <= 0) ? 0 : c;
+ return runState & ACTIVE_COUNT_MASK;
}
/**
@@ -993,7 +1533,7 @@
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return activeCountOf(runControl) == 0;
+ return (runState & ACTIVE_COUNT_MASK) == 0;
}
/**
@@ -1008,17 +1548,7 @@
* @return the number of steals
*/
public long getStealCount() {
- return stealCount.get();
- }
-
- /**
- * Accumulates steal count from a worker.
- * Call only when worker known to be idle.
- */
- private void updateStealCount(ForkJoinWorkerThread w) {
- int sc = w.getAndClearStealCount();
- if (sc != 0)
- stealCount.addAndGet(sc);
+ return stealCount;
}
/**
@@ -1033,14 +1563,9 @@
*/
public long getQueuedTaskCount() {
long count = 0;
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- count += t.getQueueSize();
- }
- }
+ for (ForkJoinWorkerThread w : workers)
+ if (w != null)
+ count += w.getQueueSize();
return count;
}
@@ -1094,16 +1619,11 @@
* @return the number of elements transferred
*/
protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
- int n = submissionQueue.drainTo(c);
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null)
- n += w.drainTasksTo(c);
- }
- }
- return n;
+ int count = submissionQueue.drainTo(c);
+ for (ForkJoinWorkerThread w : workers)
+ if (w != null)
+ count += w.drainTasksTo(c);
+ return count;
}
/**
@@ -1114,36 +1634,34 @@
* @return a string identifying this pool, as well as its state
*/
public String toString() {
- int ps = parallelism;
- int wc = workerCounts;
- int rc = runControl;
long st = getStealCount();
long qt = getQueuedTaskCount();
long qs = getQueuedSubmissionCount();
+ int wc = workerCounts;
+ int tc = wc >>> TOTAL_COUNT_SHIFT;
+ int rc = wc & RUNNING_COUNT_MASK;
+ int pc = parallelism;
+ int rs = runState;
+ int ac = rs & ACTIVE_COUNT_MASK;
return super.toString() +
- "[" + runStateToString(runStateOf(rc)) +
- ", parallelism = " + ps +
- ", size = " + totalCountOf(wc) +
- ", active = " + activeCountOf(rc) +
- ", running = " + runningCountOf(wc) +
+ "[" + runLevelToString(rs) +
+ ", parallelism = " + pc +
+ ", size = " + tc +
+ ", active = " + ac +
+ ", running = " + rc +
", steals = " + st +
", tasks = " + qt +
", submissions = " + qs +
"]";
}
- private static String runStateToString(int rs) {
- switch(rs) {
- case RUNNING: return "Running";
- case SHUTDOWN: return "Shutting down";
- case TERMINATING: return "Terminating";
- case TERMINATED: return "Terminated";
- default: throw new Error("Unknown run state");
- }
+ private static String runLevelToString(int s) {
+ return ((s & TERMINATED) != 0 ? "Terminated" :
+ ((s & TERMINATING) != 0 ? "Terminating" :
+ ((s & SHUTDOWN) != 0 ? "Shutting down" :
+ "Running")));
}
- // lifecycle control
-
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
@@ -1158,23 +1676,8 @@
*/
public void shutdown() {
checkPermission();
- transitionRunStateTo(SHUTDOWN);
- if (canTerminateOnShutdown(runControl)) {
- if (workers == null) { // shutting down before workers created
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- if (workers == null) {
- terminate();
- transitionRunStateTo(TERMINATED);
- termination.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
- terminateOnShutdown();
- }
+ advanceRunLevel(SHUTDOWN);
+ tryTerminate(false);
}
/**
@@ -1195,7 +1698,7 @@
*/
public List<Runnable> shutdownNow() {
checkPermission();
- terminate();
+ tryTerminate(true);
return Collections.emptyList();
}
@@ -1205,7 +1708,7 @@
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- return runStateOf(runControl) == TERMINATED;
+ return runState >= TERMINATED;
}
/**
@@ -1219,7 +1722,7 @@
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- return runStateOf(runControl) == TERMINATING;
+ return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
}
/**
@@ -1228,15 +1731,7 @@
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return runStateOf(runControl) >= SHUTDOWN;
- }
-
- /**
- * Returns true if pool is not terminating or terminated.
- * Used internally to suppress execution when terminating.
- */
- final boolean isProcessingTasks() {
- return runStateOf(runControl) < TERMINATING;
+ return runState >= SHUTDOWN;
}
/**
@@ -1252,585 +1747,10 @@
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.workerLock;
- lock.lock();
try {
- for (;;) {
- if (isTerminated())
- return true;
- if (nanos <= 0)
- return false;
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- lock.unlock();
- }
- }
-
- // Shutdown and termination support
-
- /**
- * Callback from terminating worker. Nulls out the corresponding
- * workers slot, and if terminating, tries to terminate; else
- * tries to shrink workers array.
- *
- * @param w the worker
- */
- final void workerTerminated(ForkJoinWorkerThread w) {
- updateStealCount(w);
- updateWorkerCount(-1);
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- int idx = w.poolIndex;
- if (idx >= 0 && idx < ws.length && ws[idx] == w)
- ws[idx] = null;
- if (totalCountOf(workerCounts) == 0) {
- terminate(); // no-op if already terminating
- transitionRunStateTo(TERMINATED);
- termination.signalAll();
- }
- else if (isProcessingTasks()) {
- tryShrinkWorkerArray();
- tryResumeSpare(true); // allow replacement
- }
- }
- } finally {
- lock.unlock();
- }
- signalIdleWorkers();
- }
-
- /**
- * Initiates termination.
- */
- private void terminate() {
- if (transitionRunStateTo(TERMINATING)) {
- stopAllWorkers();
- resumeAllSpares();
- signalIdleWorkers();
- cancelQueuedSubmissions();
- cancelQueuedWorkerTasks();
- interruptUnterminatedWorkers();
- signalIdleWorkers(); // resignal after interrupt
- }
- }
-
- /**
- * Possibly terminates when on shutdown state.
- */
- private void terminateOnShutdown() {
- if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
- terminate();
- }
-
- /**
- * Clears out and cancels submissions.
- */
- private void cancelQueuedSubmissions() {
- ForkJoinTask<?> task;
- while ((task = pollSubmission()) != null)
- task.cancel(false);
- }
-
- /**
- * Cleans out worker queues.
- */
- private void cancelQueuedWorkerTasks() {
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- t.cancelTasks();
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Sets each worker's status to terminating. Requires lock to avoid
- * conflicts with add/remove.
- */
- private void stopAllWorkers() {
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- t.shutdownNow();
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Interrupts all unterminated workers. This is not required for
- * sake of internal control, but may help unstick user code during
- * shutdown.
- */
- private void interruptUnterminatedWorkers() {
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ForkJoinWorkerThread[] ws = workers;
- if (ws != null) {
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null && !t.isTerminated()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- /*
- * Nodes for event barrier to manage idle threads. Queue nodes
- * are basic Treiber stack nodes, also used for spare stack.
- *
- * The event barrier has an event count and a wait queue (actually
- * a Treiber stack). Workers are enabled to look for work when
- * the eventCount is incremented. If they fail to find work, they
- * may wait for next count. Upon release, threads help others wake
- * up.
- *
- * Synchronization events occur only in enough contexts to
- * maintain overall liveness:
- *
- * - Submission of a new task to the pool
- * - Resizes or other changes to the workers array
- * - pool termination
- * - A worker pushing a task on an empty queue
- *
- * The case of pushing a task occurs often enough, and is heavy
- * enough compared to simple stack pushes, to require special
- * handling: Method signalWork returns without advancing count if
- * the queue appears to be empty. This would ordinarily result in
- * races causing some queued waiters not to be woken up. To avoid
- * this, the first worker enqueued in method sync (see
- * syncIsReleasable) rescans for tasks after being enqueued, and
- * helps signal if any are found. This works well because the
- * worker has nothing better to do, and so might as well help
- * alleviate the overhead and contention on the threads actually
- * doing work. Also, since event counts increments on task
- * availability exist to maintain liveness (rather than to force
- * refreshes etc), it is OK for callers to exit early if
- * contending with another signaller.
- */
- static final class WaitQueueNode {
- WaitQueueNode next; // only written before enqueued
- volatile ForkJoinWorkerThread thread; // nulled to cancel wait
- final long count; // unused for spare stack
-
- WaitQueueNode(long c, ForkJoinWorkerThread w) {
- count = c;
- thread = w;
- }
-
- /**
- * Wakes up waiter, returning false if known to already
- */
- boolean signal() {
- ForkJoinWorkerThread t = thread;
- if (t == null)
- return false;
- thread = null;
- LockSupport.unpark(t);
- return true;
- }
-
- /**
- * Awaits release on sync.
- */
- void awaitSyncRelease(ForkJoinPool p) {
- while (thread != null && !p.syncIsReleasable(this))
- LockSupport.park(this);
- }
-
- /**
- * Awaits resumption as spare.
- */
- void awaitSpareRelease() {
- while (thread != null) {
- if (!Thread.interrupted())
- LockSupport.park(this);
- }
- }
- }
-
- /**
- * Ensures that no thread is waiting for count to advance from the
- * current value of eventCount read on entry to this method, by
- * releasing waiting threads if necessary.
- *
- * @return the count
- */
- final long ensureSync() {
- long c = eventCount;
- WaitQueueNode q;
- while ((q = syncStack) != null && q.count < c) {
- if (casBarrierStack(q, null)) {
- do {
- q.signal();
- } while ((q = q.next) != null);
- break;
- }
- }
- return c;
- }
-
- /**
- * Increments event count and releases waiting threads.
- */
- private void signalIdleWorkers() {
- long c;
- do {} while (!casEventCount(c = eventCount, c+1));
- ensureSync();
- }
-
- /**
- * Signals threads waiting to poll a task. Because method sync
- * rechecks availability, it is OK to only proceed if queue
- * appears to be non-empty, and OK to skip under contention to
- * increment count (since some other thread succeeded).
- */
- final void signalWork() {
- long c;
- WaitQueueNode q;
- if (syncStack != null &&
- casEventCount(c = eventCount, c+1) &&
- (((q = syncStack) != null && q.count <= c) &&
- (!casBarrierStack(q, q.next) || !q.signal())))
- ensureSync();
- }
-
- /**
- * Waits until event count advances from last value held by
- * caller, or if excess threads, caller is resumed as spare, or
- * caller or pool is terminating. Updates caller's event on exit.
- *
- * @param w the calling worker thread
- */
- final void sync(ForkJoinWorkerThread w) {
- updateStealCount(w); // Transfer w's count while it is idle
-
- while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
- long prev = w.lastEventCount;
- WaitQueueNode node = null;
- WaitQueueNode h;
- while (eventCount == prev &&
- ((h = syncStack) == null || h.count == prev)) {
- if (node == null)
- node = new WaitQueueNode(prev, w);
- if (casBarrierStack(node.next = h, node)) {
- node.awaitSyncRelease(this);
- break;
- }
- }
- long ec = ensureSync();
- if (ec != prev) {
- w.lastEventCount = ec;
- break;
- }
- }
- }
-
- /**
- * Returns {@code true} if worker waiting on sync can proceed:
- * - on signal (thread == null)
- * - on event count advance (winning race to notify vs signaller)
- * - on interrupt
- * - if the first queued node, we find work available
- * If node was not signalled and event count not advanced on exit,
- * then we also help advance event count.
- *
- * @return {@code true} if node can be released
- */
- final boolean syncIsReleasable(WaitQueueNode node) {
- long prev = node.count;
- if (!Thread.interrupted() && node.thread != null &&
- (node.next != null ||
- !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
- eventCount == prev)
+ return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
+ } catch (TimeoutException ex) {
return false;
- if (node.thread != null) {
- node.thread = null;
- long ec = eventCount;
- if (prev <= ec) // help signal
- casEventCount(ec, ec+1);
- }
- return true;
- }
-
- /**
- * Returns {@code true} if a new sync event occurred since last
- * call to sync or this method, if so, updating caller's count.
- */
- final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
- long lc = w.lastEventCount;
- long ec = ensureSync();
- if (ec == lc)
- return false;
- w.lastEventCount = ec;
- return true;
- }
-
- // Parallelism maintenance
-
- /**
- * Decrements running count; if too low, adds spare.
- *
- * Conceptually, all we need to do here is add or resume a
- * spare thread when one is about to block (and remove or
- * suspend it later when unblocked -- see suspendIfSpare).
- * However, implementing this idea requires coping with
- * several problems: we have imperfect information about the
- * states of threads. Some count updates can and usually do
- * lag run state changes, despite arrangements to keep them
- * accurate (for example, when possible, updating counts
- * before signalling or resuming), especially when running on
- * dynamic JVMs that don't optimize the infrequent paths that
- * update counts. Generating too many threads can make these
- * problems become worse, because excess threads are more
- * likely to be context-switched with others, slowing them all
- * down, especially if there is no work available, so all are
- * busy scanning or idling. Also, excess spare threads can
- * only be suspended or removed when they are idle, not
- * immediately when they aren't needed. So adding threads will
- * raise parallelism level for longer than necessary. Also,
- * FJ applications often encounter highly transient peaks when
- * many threads are blocked joining, but for less time than it
- * takes to create or resume spares.
- *
- * @param joinMe if non-null, return early if done
- * @param maintainParallelism if true, try to stay within
- * target counts, else create only to avoid starvation
- * @return true if joinMe known to be done
- */
- final boolean preJoin(ForkJoinTask<?> joinMe,
- boolean maintainParallelism) {
- maintainParallelism &= maintainsParallelism; // overrride
- boolean dec = false; // true when running count decremented
- while (spareStack == null || !tryResumeSpare(dec)) {
- int counts = workerCounts;
- if (dec || (dec = casWorkerCounts(counts, --counts))) {
- if (!needSpare(counts, maintainParallelism))
- break;
- if (joinMe.status < 0)
- return true;
- if (tryAddSpare(counts))
- break;
- }
- }
- return false;
- }
-
- /**
- * Same idea as preJoin
- */
- final boolean preBlock(ManagedBlocker blocker,
- boolean maintainParallelism) {
- maintainParallelism &= maintainsParallelism;
- boolean dec = false;
- while (spareStack == null || !tryResumeSpare(dec)) {
- int counts = workerCounts;
- if (dec || (dec = casWorkerCounts(counts, --counts))) {
- if (!needSpare(counts, maintainParallelism))
- break;
- if (blocker.isReleasable())
- return true;
- if (tryAddSpare(counts))
- break;
- }
- }
- return false;
- }
-
- /**
- * Returns {@code true} if a spare thread appears to be needed.
- * If maintaining parallelism, returns true when the deficit in
- * running threads is more than the surplus of total threads, and
- * there is apparently some work to do. This self-limiting rule
- * means that the more threads that have already been added, the
- * less parallelism we will tolerate before adding another.
- *
- * @param counts current worker counts
- * @param maintainParallelism try to maintain parallelism
- */
- private boolean needSpare(int counts, boolean maintainParallelism) {
- int ps = parallelism;
- int rc = runningCountOf(counts);
- int tc = totalCountOf(counts);
- int runningDeficit = ps - rc;
- int totalSurplus = tc - ps;
- return (tc < maxPoolSize &&
- (rc == 0 || totalSurplus < 0 ||
- (maintainParallelism &&
- runningDeficit > totalSurplus &&
- ForkJoinWorkerThread.hasQueuedTasks(workers))));
- }
-
- /**
- * Adds a spare worker if lock available and no more than the
- * expected numbers of threads exist.
- *
- * @return true if successful
- */
- private boolean tryAddSpare(int expectedCounts) {
- final ReentrantLock lock = this.workerLock;
- int expectedRunning = runningCountOf(expectedCounts);
- int expectedTotal = totalCountOf(expectedCounts);
- boolean success = false;
- boolean locked = false;
- // confirm counts while locking; CAS after obtaining lock
- try {
- for (;;) {
- int s = workerCounts;
- int tc = totalCountOf(s);
- int rc = runningCountOf(s);
- if (rc > expectedRunning || tc > expectedTotal)
- break;
- if (!locked && !(locked = lock.tryLock()))
- break;
- if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
- createAndStartSpare(tc);
- success = true;
- break;
- }
- }
- } finally {
- if (locked)
- lock.unlock();
- }
- return success;
- }
-
- /**
- * Adds the kth spare worker. On entry, pool counts are already
- * adjusted to reflect addition.
- */
- private void createAndStartSpare(int k) {
- ForkJoinWorkerThread w = null;
- ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
- int len = ws.length;
- // Probably, we can place at slot k. If not, find empty slot
- if (k < len && ws[k] != null) {
- for (k = 0; k < len && ws[k] != null; ++k)
- ;
- }
- if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
- ws[k] = w;
- w.start();
- }
- else
- updateWorkerCount(-1); // adjust on failure
- signalIdleWorkers();
- }
-
- /**
- * Suspends calling thread w if there are excess threads. Called
- * only from sync. Spares are enqueued in a Treiber stack using
- * the same WaitQueueNodes as barriers. They are resumed mainly
- * in preJoin, but are also woken on pool events that require all
- * threads to check run state.
- *
- * @param w the caller
- */
- private boolean suspendIfSpare(ForkJoinWorkerThread w) {
- WaitQueueNode node = null;
- int s;
- while (parallelism < runningCountOf(s = workerCounts)) {
- if (node == null)
- node = new WaitQueueNode(0, w);
- if (casWorkerCounts(s, s-1)) { // representation-dependent
- // push onto stack
- do {} while (!casSpareStack(node.next = spareStack, node));
- // block until released by resumeSpare
- node.awaitSpareRelease();
- return true;
- }
- }
- return false;
- }
-
- /**
- * Tries to pop and resume a spare thread.
- *
- * @param updateCount if true, increment running count on success
- * @return true if successful
- */
- private boolean tryResumeSpare(boolean updateCount) {
- WaitQueueNode q;
- while ((q = spareStack) != null) {
- if (casSpareStack(q, q.next)) {
- if (updateCount)
- updateRunningCount(1);
- q.signal();
- return true;
- }
- }
- return false;
- }
-
- /**
- * Pops and resumes all spare threads. Same idea as ensureSync.
- *
- * @return true if any spares released
- */
- private boolean resumeAllSpares() {
- WaitQueueNode q;
- while ( (q = spareStack) != null) {
- if (casSpareStack(q, null)) {
- do {
- updateRunningCount(1);
- q.signal();
- } while ((q = q.next) != null);
- return true;
- }
- }
- return false;
- }
-
- /**
- * Pops and shuts down excessive spare threads. Call only while
- * holding lock. This is not guaranteed to eliminate all excess
- * threads, only those suspended as spares, which are the ones
- * unlikely to be needed in the future.
- */
- private void trimSpares() {
- int surplus = totalCountOf(workerCounts) - parallelism;
- WaitQueueNode q;
- while (surplus > 0 && (q = spareStack) != null) {
- if (casSpareStack(q, null)) {
- do {
- updateRunningCount(1);
- ForkJoinWorkerThread w = q.thread;
- if (w != null && surplus > 0 &&
- runningCountOf(workerCounts) > 0 && w.shutdown())
- --surplus;
- q.signal();
- } while ((q = q.next) != null);
- }
}
}
@@ -1838,11 +1758,17 @@
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
- * <p>A {@code ManagedBlocker} provides two methods.
- * Method {@code isReleasable} must return {@code true} if
- * blocking is not necessary. Method {@code block} blocks the
- * current thread if necessary (perhaps internally invoking
- * {@code isReleasable} before actually blocking).
+ * <p>A {@code ManagedBlocker} provides two methods. Method
+ * {@code isReleasable} must return {@code true} if blocking is
+ * not necessary. Method {@code block} blocks the current thread
+ * if necessary (perhaps internally invoking {@code isReleasable}
+ * before actually blocking). The unusual methods in this API
+ * accommodate synchronizers that may, but don't usually, block
+ * for long periods. Similarly, they allow more efficient internal
+ * handling of cases in which additional workers may be, but
+ * usually are not, needed to ensure sufficient parallelism.
+ * Toward this end, implementations of method {@code isReleasable}
+ * must be amenable to repeated invocation.
*
* <p>For example, here is a ManagedBlocker based on a
* ReentrantLock:
@@ -1860,6 +1786,26 @@
* return hasLock || (hasLock = lock.tryLock());
* }
* }}</pre>
+ *
+ * <p>Here is a class that possibly blocks waiting for an
+ * item on a given queue:
+ * <pre> {@code
+ * class QueueTaker<E> implements ManagedBlocker {
+ * final BlockingQueue<E> queue;
+ * volatile E item = null;
+ * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
+ * public boolean block() throws InterruptedException {
+ * if (item == null)
+ * item = queue.take();
+ * return true;
+ * }
+ * public boolean isReleasable() {
+ * return item != null || (item = queue.poll()) != null;
+ * }
+ * public E getItem() { // call after pool.managedBlock completes
+ * return item;
+ * }
+ * }}</pre>
*/
public static interface ManagedBlocker {
/**
@@ -1883,14 +1829,7 @@
* Blocks in accord with the given blocker. If the current thread
* is a {@link ForkJoinWorkerThread}, this method possibly
* arranges for a spare thread to be activated if necessary to
- * ensure parallelism while the current thread is blocked.
- *
- * <p>If {@code maintainParallelism} is {@code true} and the pool
- * supports it ({@link #getMaintainsParallelism}), this method
- * attempts to maintain the pool's nominal parallelism. Otherwise
- * it activates a thread only if necessary to avoid complete
- * starvation. This option may be preferable when blockages use
- * timeouts, or are almost always brief.
+ * ensure sufficient parallelism while the current thread is blocked.
*
* <p>If the caller is not a {@link ForkJoinTask}, this method is
* behaviorally equivalent to
@@ -1904,33 +1843,18 @@
* first be expanded to ensure parallelism, and later adjusted.
*
* @param blocker the blocker
- * @param maintainParallelism if {@code true} and supported by
- * this pool, attempt to maintain the pool's nominal parallelism;
- * otherwise activate a thread only if necessary to avoid
- * complete starvation.
* @throws InterruptedException if blocker.block did so
*/
- public static void managedBlock(ManagedBlocker blocker,
- boolean maintainParallelism)
+ public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t = Thread.currentThread();
- ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread) t).pool : null);
- if (!blocker.isReleasable()) {
- try {
- if (pool == null ||
- !pool.preBlock(blocker, maintainParallelism))
- awaitBlocker(blocker);
- } finally {
- if (pool != null)
- pool.updateRunningCount(1);
- }
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
+ w.pool.awaitBlocker(blocker);
}
- }
-
- private static void awaitBlocker(ManagedBlocker blocker)
- throws InterruptedException {
- do {} while (!blocker.isReleasable() && !blocker.block());
+ else {
+ do {} while (!blocker.isReleasable() && !blocker.block());
+ }
}
// AbstractExecutorService overrides. These rely on undocumented
@@ -1948,32 +1872,18 @@
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
- private static final long eventCountOffset =
- objectFieldOffset("eventCount", ForkJoinPool.class);
private static final long workerCountsOffset =
objectFieldOffset("workerCounts", ForkJoinPool.class);
- private static final long runControlOffset =
- objectFieldOffset("runControl", ForkJoinPool.class);
- private static final long syncStackOffset =
- objectFieldOffset("syncStack",ForkJoinPool.class);
- private static final long spareStackOffset =
- objectFieldOffset("spareStack", ForkJoinPool.class);
-
- private boolean casEventCount(long cmp, long val) {
- return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
- }
- private boolean casWorkerCounts(int cmp, int val) {
- return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
- }
- private boolean casRunControl(int cmp, int val) {
- return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
- }
- private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
- return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
- }
- private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
- return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
- }
+ private static final long runStateOffset =
+ objectFieldOffset("runState", ForkJoinPool.class);
+ private static final long eventCountOffset =
+ objectFieldOffset("eventCount", ForkJoinPool.class);
+ private static final long eventWaitersOffset =
+ objectFieldOffset("eventWaiters", ForkJoinPool.class);
+ private static final long stealCountOffset =
+ objectFieldOffset("stealCount", ForkJoinPool.class);
+ private static final long spareWaitersOffset =
+ objectFieldOffset("spareWaiters", ForkJoinPool.class);
private static long objectFieldOffset(String field, Class<?> klazz) {
try {
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Mon Sep 13 09:55:03 2010 +0100
@@ -91,10 +91,7 @@
* results of a task is {@link #join}, but there are several variants:
* The {@link Future#get} methods support interruptible and/or timed
* waits for completion and report results using {@code Future}
- * conventions. Method {@link #helpJoin} enables callers to actively
- * execute other tasks while awaiting joins, which is sometimes more
- * efficient but only applies when all subtasks are known to be
- * strictly tree-structured. Method {@link #invoke} is semantically
+ * conventions. Method {@link #invoke} is semantically
* equivalent to {@code fork(); join()} but always attempts to begin
* execution in the current thread. The "<em>quiet</em>" forms of
* these methods do not extract results or report exceptions. These
@@ -130,7 +127,7 @@
* ForkJoinTasks (as may be determined using method {@link
* #inForkJoinPool}). Attempts to invoke them in other contexts
* result in exceptions or errors, possibly including
- * ClassCastException.
+ * {@code ClassCastException}.
*
* <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -152,9 +149,8 @@
*
* <p>This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
- * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
- * are of this form, consider using a pool in
- * {@linkplain ForkJoinPool#setAsyncMode async mode}.
+ * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
+ * of this form, consider using a pool constructed in <em>asyncMode</em>.
*
* <p>ForkJoinTasks are {@code Serializable}, which enables them to be
* used in extensions such as remote execution frameworks. It is
@@ -166,33 +162,43 @@
*/
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
- /**
- * Run control status bits packed into a single int to minimize
- * footprint and to ensure atomicity (via CAS). Status is
- * initially zero, and takes on nonnegative values until
- * completed, upon which status holds COMPLETED. CANCELLED, or
- * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
- * blocking waits by other threads have SIGNAL_MASK bits set --
- * bit 15 for external (nonFJ) waits, and the rest a count of
- * waiting FJ threads. (This representation relies on
- * ForkJoinPool max thread limits). Completion of a stolen task
- * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
- * though suboptimal for some purposes, we use basic builtin
- * wait/notify to take advantage of "monitor inflation" in JVMs
- * that we would otherwise need to emulate to avoid adding further
- * per-task bookkeeping overhead. Note that bits 16-28 are
- * currently unused. Also value 0x80000000 is available as spare
- * completion value.
+ /*
+ * See the internal documentation of class ForkJoinPool for a
+ * general implementation overview. ForkJoinTasks are mainly
+ * responsible for maintaining their "status" field amidst relays
+ * to methods in ForkJoinWorkerThread and ForkJoinPool. The
+ * methods of this class are more-or-less layered into (1) basic
+ * status maintenance (2) execution and awaiting completion (3)
+ * user-level methods that additionally report results. This is
+ * sometimes hard to see because this file orders exported methods
+ * in a way that flows well in javadocs. In particular, most
+ * join mechanics are in method quietlyJoin, below.
*/
+
+ /*
+ * The status field holds run control status bits packed into a
+ * single int to minimize footprint and to ensure atomicity (via
+ * CAS). Status is initially zero, and takes on nonnegative
+ * values until completed, upon which status holds value
+ * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
+ * waits by other threads have the SIGNAL bit set. Completion of
+ * a stolen task with SIGNAL set awakens any waiters via
+ * notifyAll. Even though suboptimal for some purposes, we use
+ * basic builtin wait/notify to take advantage of "monitor
+ * inflation" in JVMs that we would otherwise need to emulate to
+ * avoid adding further per-task bookkeeping overhead. We want
+ * these monitors to be "fat", i.e., not use biasing or thin-lock
+ * techniques, so use some odd coding idioms that tend to avoid
+ * them.
+ */
+
+ /** The run status of this task */
volatile int status; // accessed directly by pool and workers
- static final int COMPLETION_MASK = 0xe0000000;
- static final int NORMAL = 0xe0000000; // == mask
- static final int CANCELLED = 0xc0000000;
- static final int EXCEPTIONAL = 0xa0000000;
- static final int SIGNAL_MASK = 0x0000ffff;
- static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
- static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
+ private static final int NORMAL = -1;
+ private static final int CANCELLED = -2;
+ private static final int EXCEPTIONAL = -3;
+ private static final int SIGNAL = 1;
/**
* Table of exceptions thrown by tasks, to enable reporting by
@@ -206,176 +212,94 @@
Collections.synchronizedMap
(new WeakHashMap<ForkJoinTask<?>, Throwable>());
- // within-package utilities
+ // Maintaining completion status
/**
- * Gets current worker thread, or null if not a worker thread.
- */
- static ForkJoinWorkerThread getWorker() {
- Thread t = Thread.currentThread();
- return ((t instanceof ForkJoinWorkerThread) ?
- (ForkJoinWorkerThread) t : null);
- }
-
- final boolean casStatus(int cmp, int val) {
- return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
- }
-
- /**
- * Workaround for not being able to rethrow unchecked exceptions.
- */
- static void rethrowException(Throwable ex) {
- if (ex != null)
- UNSAFE.throwException(ex);
- }
-
- // Setting completion status
-
- /**
- * Marks completion and wakes up threads waiting to join this task.
+ * Marks completion and wakes up threads waiting to join this task,
+ * also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
*/
- final void setCompletion(int completion) {
- ForkJoinPool pool = getPool();
- if (pool != null) {
- int s; // Clear signal bits while setting completion status
- do {} while ((s = status) >= 0 && !casStatus(s, completion));
-
- if ((s & SIGNAL_MASK) != 0) {
- if ((s &= INTERNAL_SIGNAL_MASK) != 0)
- pool.updateRunningCount(s);
- synchronized (this) { notifyAll(); }
- }
- }
- else
- externallySetCompletion(completion);
- }
-
- /**
- * Version of setCompletion for non-FJ threads. Leaves signal
- * bits for unblocked threads to adjust, and always notifies.
- */
- private void externallySetCompletion(int completion) {
+ private void setCompletion(int completion) {
int s;
- do {} while ((s = status) >= 0 &&
- !casStatus(s, (s & SIGNAL_MASK) | completion));
- synchronized (this) { notifyAll(); }
- }
-
- /**
- * Sets status to indicate normal completion.
- */
- final void setNormalCompletion() {
- // Try typical fast case -- single CAS, no signal, not already done.
- // Manually expand casStatus to improve chances of inlining it
- if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
- setCompletion(NORMAL);
- }
-
- // internal waiting and notification
-
- /**
- * Performs the actual monitor wait for awaitDone.
- */
- private void doAwaitDone() {
- // Minimize lock bias and in/de-flation effects by maximizing
- // chances of waiting inside sync
- try {
- while (status >= 0)
- synchronized (this) { if (status >= 0) wait(); }
- } catch (InterruptedException ie) {
- onInterruptedWait();
+ while ((s = status) >= 0) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
+ if (s != 0)
+ synchronized (this) { notifyAll(); }
+ break;
+ }
}
}
/**
- * Performs the actual timed monitor wait for awaitDone.
+ * Records exception and sets exceptional completion.
+ *
+ * @return status on exit
*/
- private void doAwaitDone(long startTime, long nanos) {
- synchronized (this) {
+ private void setExceptionalCompletion(Throwable rex) {
+ exceptionMap.put(this, rex);
+ setCompletion(EXCEPTIONAL);
+ }
+
+ /**
+ * Blocks a worker thread until completion. Called only by
+ * pool. Currently unused -- pool-based waits use timeout
+ * version below.
+ */
+ final void internalAwaitDone() {
+ int s; // the odd construction reduces lock bias effects
+ while ((s = status) >= 0) {
try {
- while (status >= 0) {
- long nt = nanos - (System.nanoTime() - startTime);
- if (nt <= 0)
- break;
- wait(nt / 1000000, (int) (nt % 1000000));
+ synchronized(this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
+ wait();
}
} catch (InterruptedException ie) {
- onInterruptedWait();
+ cancelIfTerminating();
}
}
}
- // Awaiting completion
-
/**
- * Sets status to indicate there is joiner, then waits for join,
- * surrounded with pool notifications.
+ * Blocks a worker thread until completed or timed out. Called
+ * only by pool.
*
- * @return status upon exit
+ * @return status on exit
*/
- private int awaitDone(ForkJoinWorkerThread w,
- boolean maintainParallelism) {
- ForkJoinPool pool = (w == null) ? null : w.pool;
+ final int internalAwaitDone(long millis) {
int s;
- while ((s = status) >= 0) {
- if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
- if (pool == null || !pool.preJoin(this, maintainParallelism))
- doAwaitDone();
- if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
- adjustPoolCountsOnUnblock(pool);
- break;
+ if ((s = status) >= 0) {
+ try {
+ synchronized(this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
+ wait(millis, 0);
+ }
+ } catch (InterruptedException ie) {
+ cancelIfTerminating();
}
+ s = status;
}
return s;
}
/**
- * Timed version of awaitDone
- *
- * @return status upon exit
+ * Blocks a non-worker-thread until completion.
*/
- private int awaitDone(ForkJoinWorkerThread w, long nanos) {
- ForkJoinPool pool = (w == null) ? null : w.pool;
+ private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
- if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
- long startTime = System.nanoTime();
- if (pool == null || !pool.preJoin(this, false))
- doAwaitDone(startTime, nanos);
- if ((s = status) >= 0) {
- adjustPoolCountsOnCancelledWait(pool);
- s = status;
- }
- if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
- adjustPoolCountsOnUnblock(pool);
- break;
- }
- }
- return s;
- }
-
- /**
- * Notifies pool that thread is unblocked. Called by signalled
- * threads when woken by non-FJ threads (which is atypical).
- */
- private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
- int s;
- do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
- if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
- pool.updateRunningCount(s);
- }
-
- /**
- * Notifies pool to adjust counts on cancelled or timed out wait.
- */
- private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
- if (pool != null) {
- int s;
- while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
- if (casStatus(s, s - 1)) {
- pool.updateRunningCount(1);
+ synchronized(this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
+ boolean interrupted = false;
+ while (status >= 0) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
break;
}
}
@@ -383,153 +307,19 @@
}
/**
- * Handles interruptions during waits.
- */
- private void onInterruptedWait() {
- ForkJoinWorkerThread w = getWorker();
- if (w == null)
- Thread.currentThread().interrupt(); // re-interrupt
- else if (w.isTerminating())
- cancelIgnoringExceptions();
- // else if FJworker, ignore interrupt
- }
-
- // Recording and reporting exceptions
-
- private void setDoneExceptionally(Throwable rex) {
- exceptionMap.put(this, rex);
- setCompletion(EXCEPTIONAL);
- }
-
- /**
- * Throws the exception associated with status s.
- *
- * @throws the exception
- */
- private void reportException(int s) {
- if ((s &= COMPLETION_MASK) < NORMAL) {
- if (s == CANCELLED)
- throw new CancellationException();
- else
- rethrowException(exceptionMap.get(this));
- }
- }
-
- /**
- * Returns result or throws exception using j.u.c.Future conventions.
- * Only call when {@code isDone} known to be true or thread known
- * to be interrupted.
- */
- private V reportFutureResult()
- throws InterruptedException, ExecutionException {
- if (Thread.interrupted())
- throw new InterruptedException();
- int s = status & COMPLETION_MASK;
- if (s < NORMAL) {
- Throwable ex;
- if (s == CANCELLED)
- throw new CancellationException();
- if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
- throw new ExecutionException(ex);
- }
- return getRawResult();
- }
-
- /**
- * Returns result or throws exception using j.u.c.Future conventions
- * with timeouts.
- */
- private V reportTimedFutureResult()
- throws InterruptedException, ExecutionException, TimeoutException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Throwable ex;
- int s = status & COMPLETION_MASK;
- if (s == NORMAL)
- return getRawResult();
- else if (s == CANCELLED)
- throw new CancellationException();
- else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
- throw new ExecutionException(ex);
- else
- throw new TimeoutException();
- }
-
- // internal execution methods
-
- /**
- * Calls exec, recording completion, and rethrowing exception if
- * encountered. Caller should normally check status before calling.
- *
- * @return true if completed normally
- */
- private boolean tryExec() {
- try { // try block must contain only call to exec
- if (!exec())
- return false;
- } catch (Throwable rex) {
- setDoneExceptionally(rex);
- rethrowException(rex);
- return false; // not reached
- }
- setNormalCompletion();
- return true;
- }
-
- /**
- * Main execution method used by worker threads. Invokes
- * base computation unless already complete.
+ * Unless done, calls exec and records status if completed, but
+ * doesn't wait for completion otherwise. Primary execution method
+ * for ForkJoinWorkerThread.
*/
final void quietlyExec() {
- if (status >= 0) {
- try {
- if (!exec())
- return;
- } catch (Throwable rex) {
- setDoneExceptionally(rex);
+ try {
+ if (status < 0 || !exec())
return;
- }
- setNormalCompletion();
- }
- }
-
- /**
- * Calls exec(), recording but not rethrowing exception.
- * Caller should normally check status before calling.
- *
- * @return true if completed normally
- */
- private boolean tryQuietlyInvoke() {
- try {
- if (!exec())
- return false;
} catch (Throwable rex) {
- setDoneExceptionally(rex);
- return false;
+ setExceptionalCompletion(rex);
+ return;
}
- setNormalCompletion();
- return true;
- }
-
- /**
- * Cancels, ignoring any exceptions it throws.
- */
- final void cancelIgnoringExceptions() {
- try {
- cancel(false);
- } catch (Throwable ignore) {
- }
- }
-
- /**
- * Main implementation of helpJoin
- */
- private int busyJoin(ForkJoinWorkerThread w) {
- int s;
- ForkJoinTask<?> t;
- while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
- t.quietlyExec();
- return (s >= 0) ? awaitDone(w, false) : s; // block if no work
+ setCompletion(NORMAL); // must be outside try block
}
// public methods
@@ -567,34 +357,41 @@
* @return the computed result
*/
public final V join() {
- ForkJoinWorkerThread w = getWorker();
- if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
- reportException(awaitDone(w, true));
+ quietlyJoin();
+ Throwable ex;
+ if (status < NORMAL && (ex = getException()) != null)
+ UNSAFE.throwException(ex);
return getRawResult();
}
/**
* Commences performing this task, awaits its completion if
- * necessary, and return its result, or throws an (unchecked)
- * exception if the underlying computation did so.
+ * necessary, and returns its result, or throws an (unchecked)
+ * {@code RuntimeException} or {@code Error} if the underlying
+ * computation did so.
*
* @return the computed result
*/
public final V invoke() {
- if (status >= 0 && tryExec())
- return getRawResult();
- else
- return join();
+ quietlyInvoke();
+ Throwable ex;
+ if (status < NORMAL && (ex = getException()) != null)
+ UNSAFE.throwException(ex);
+ return getRawResult();
}
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If either task encounters an
- * exception, the other one may be, but is not guaranteed to be,
- * cancelled. If both tasks throw an exception, then this method
- * throws one of them. The individual status of each task may be
- * checked using {@link #getException()} and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, the
+ * other may be cancelled. However, the execution status of
+ * individual tasks is not guaranteed upon exceptional return. The
+ * status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -615,12 +412,14 @@
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If any task encounters an
- * exception, others may be, but are not guaranteed to be,
- * cancelled. If more than one task encounters an exception, then
- * this method throws any one of these exceptions. The individual
- * status of each task may be checked using {@link #getException()}
- * and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, others
+ * may be cancelled. However, the execution status of individual
+ * tasks is not guaranteed upon exceptional return. The status of
+ * each task may be obtained using {@link #getException()} and
+ * related methods to check if they have been cancelled, completed
+ * normally or exceptionally, or left unprocessed.
*
* <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -644,7 +443,7 @@
t.fork();
else {
t.quietlyInvoke();
- if (ex == null)
+ if (ex == null && t.status < NORMAL)
ex = t.getException();
}
}
@@ -655,26 +454,27 @@
t.cancel(false);
else {
t.quietlyJoin();
- if (ex == null)
+ if (ex == null && t.status < NORMAL)
ex = t.getException();
}
}
}
if (ex != null)
- rethrowException(ex);
+ UNSAFE.throwException(ex);
}
/**
* Forks all tasks in the specified collection, returning when
* {@code isDone} holds for each task or an (unchecked) exception
- * is encountered. If any task encounters an exception, others
- * may be, but are not guaranteed to be, cancelled. If more than
- * one task encounters an exception, then this method throws any
- * one of these exceptions. The individual status of each task
- * may be checked using {@link #getException()} and related
- * methods. The behavior of this operation is undefined if the
- * specified collection is modified while the operation is in
- * progress.
+ * is encountered, in which case the exception is rethrown. If
+ * more than one task encounters an exception, then this method
+ * throws any one of these exceptions. If any task encounters an
+ * exception, others may be cancelled. However, the execution
+ * status of individual tasks is not guaranteed upon exceptional
+ * return. The status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* <p>This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -706,7 +506,7 @@
t.fork();
else {
t.quietlyInvoke();
- if (ex == null)
+ if (ex == null && t.status < NORMAL)
ex = t.getException();
}
}
@@ -717,13 +517,13 @@
t.cancel(false);
else {
t.quietlyJoin();
- if (ex == null)
+ if (ex == null && t.status < NORMAL)
ex = t.getException();
}
}
}
if (ex != null)
- rethrowException(ex);
+ UNSAFE.throwException(ex);
return tasks;
}
@@ -753,7 +553,35 @@
*/
public boolean cancel(boolean mayInterruptIfRunning) {
setCompletion(CANCELLED);
- return (status & COMPLETION_MASK) == CANCELLED;
+ return status == CANCELLED;
+ }
+
+ /**
+ * Cancels, ignoring any exceptions thrown by cancel. Used during
+ * worker and pool shutdown. Cancel is spec'ed not to throw any
+ * exceptions, but if it does anyway, we have no recourse during
+ * shutdown, so guard against this case.
+ */
+ final void cancelIgnoringExceptions() {
+ try {
+ cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+
+ /**
+ * Cancels if current thread is a terminating worker thread,
+ * ignoring any exceptions thrown by cancel.
+ */
+ final void cancelIfTerminating() {
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread) t).isTerminating()) {
+ try {
+ cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
}
public final boolean isDone() {
@@ -761,7 +589,7 @@
}
public final boolean isCancelled() {
- return (status & COMPLETION_MASK) == CANCELLED;
+ return status == CANCELLED;
}
/**
@@ -770,7 +598,7 @@
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
- return (status & COMPLETION_MASK) < NORMAL;
+ return status < NORMAL;
}
/**
@@ -781,7 +609,7 @@
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
- return (status & COMPLETION_MASK) == NORMAL;
+ return status == NORMAL;
}
/**
@@ -792,7 +620,7 @@
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
- int s = status & COMPLETION_MASK;
+ int s = status;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
exceptionMap.get(this));
@@ -813,20 +641,21 @@
* thrown will be a {@code RuntimeException} with cause {@code ex}.
*/
public void completeExceptionally(Throwable ex) {
- setDoneExceptionally((ex instanceof RuntimeException) ||
- (ex instanceof Error) ? ex :
- new RuntimeException(ex));
+ setExceptionalCompletion((ex instanceof RuntimeException) ||
+ (ex instanceof Error) ? ex :
+ new RuntimeException(ex));
}
/**
* Completes this task, and if not already aborted or cancelled,
- * returning a {@code null} result upon {@code join} and related
- * operations. This method may be used to provide results for
- * asynchronous tasks, or to provide alternative handling for
- * tasks that would not otherwise complete normally. Its use in
- * other situations is discouraged. This method is
- * overridable, but overridden versions must invoke {@code super}
- * implementation to maintain guarantees.
+ * returning the given value as the result of subsequent
+ * invocations of {@code join} and related operations. This method
+ * may be used to provide results for asynchronous tasks, or to
+ * provide alternative handling for tasks that would not otherwise
+ * complete normally. Its use in other situations is
+ * discouraged. This method is overridable, but overridden
+ * versions must invoke {@code super} implementation to maintain
+ * guarantees.
*
* @param value the result value for this task
*/
@@ -834,97 +663,151 @@
try {
setRawResult(value);
} catch (Throwable rex) {
- setDoneExceptionally(rex);
+ setExceptionalCompletion(rex);
return;
}
- setNormalCompletion();
+ setCompletion(NORMAL);
}
public final V get() throws InterruptedException, ExecutionException {
- ForkJoinWorkerThread w = getWorker();
- if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
- awaitDone(w, true);
- return reportFutureResult();
+ quietlyJoin();
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ int s = status;
+ if (s < NORMAL) {
+ Throwable ex;
+ if (s == CANCELLED)
+ throw new CancellationException();
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ throw new ExecutionException(ex);
+ }
+ return getRawResult();
}
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ Thread t = Thread.currentThread();
+ ForkJoinPool pool;
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
+ if (status >= 0 && w.unpushTask(this))
+ quietlyExec();
+ pool = w.pool;
+ }
+ else
+ pool = null;
+ /*
+ * Timed wait loop intermixes cases for FJ (pool != null) and
+ * non FJ threads. For FJ, decrement pool count but don't try
+ * for replacement; increment count on completion. For non-FJ,
+ * deal with interrupts. This is messy, but a little less so
+ * than is splitting the FJ and nonFJ cases.
+ */
+ boolean interrupted = false;
+ boolean dec = false; // true if pool count decremented
long nanos = unit.toNanos(timeout);
- ForkJoinWorkerThread w = getWorker();
- if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
- awaitDone(w, nanos);
- return reportTimedFutureResult();
- }
-
- /**
- * Possibly executes other tasks until this task {@link #isDone is
- * done}, then returns the result of the computation. This method
- * may be more efficient than {@code join}, but is only applicable
- * when there are no potential dependencies between continuation
- * of the current task and that of any other task that might be
- * executed while helping. (This usually holds for pure
- * divide-and-conquer tasks).
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
- * @return the computed result
- */
- public final V helpJoin() {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
- if (status < 0 || !w.unpushTask(this) || !tryExec())
- reportException(busyJoin(w));
+ for (;;) {
+ if (pool == null && Thread.interrupted()) {
+ interrupted = true;
+ break;
+ }
+ int s = status;
+ if (s < 0)
+ break;
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
+ long startTime = System.nanoTime();
+ long nt; // wait time
+ while (status >= 0 &&
+ (nt = nanos - (System.nanoTime() - startTime)) > 0) {
+ if (pool != null && !dec)
+ dec = pool.tryDecrementRunningCount();
+ else {
+ long ms = nt / 1000000;
+ int ns = (int) (nt % 1000000);
+ try {
+ synchronized(this) {
+ if (status >= 0)
+ wait(ms, ns);
+ }
+ } catch (InterruptedException ie) {
+ if (pool != null)
+ cancelIfTerminating();
+ else {
+ interrupted = true;
+ break;
+ }
+ }
+ }
+ }
+ break;
+ }
+ }
+ if (pool != null && dec)
+ pool.incrementRunningCount();
+ if (interrupted)
+ throw new InterruptedException();
+ int es = status;
+ if (es != NORMAL) {
+ Throwable ex;
+ if (es == CANCELLED)
+ throw new CancellationException();
+ if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ throw new ExecutionException(ex);
+ throw new TimeoutException();
+ }
return getRawResult();
}
/**
- * Possibly executes other tasks until this task {@link #isDone is
- * done}. This method may be useful when processing collections
- * of tasks when some have been cancelled or otherwise known to
- * have aborted.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- */
- public final void quietlyHelpJoin() {
- if (status >= 0) {
- ForkJoinWorkerThread w =
- (ForkJoinWorkerThread) Thread.currentThread();
- if (!w.unpushTask(this) || !tryQuietlyInvoke())
- busyJoin(w);
- }
- }
-
- /**
- * Joins this task, without returning its result or throwing an
+ * Joins this task, without returning its result or throwing its
* exception. This method may be useful when processing
* collections of tasks when some have been cancelled or otherwise
* known to have aborted.
*/
public final void quietlyJoin() {
- if (status >= 0) {
- ForkJoinWorkerThread w = getWorker();
- if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
- awaitDone(w, true);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
+ if (status >= 0) {
+ if (w.unpushTask(this)) {
+ boolean completed;
+ try {
+ completed = exec();
+ } catch (Throwable rex) {
+ setExceptionalCompletion(rex);
+ return;
+ }
+ if (completed) {
+ setCompletion(NORMAL);
+ return;
+ }
+ }
+ w.joinTask(this);
+ }
}
+ else
+ externalAwaitDone();
}
/**
* Commences performing this task and awaits its completion if
- * necessary, without returning its result or throwing an
- * exception. This method may be useful when processing
- * collections of tasks when some have been cancelled or otherwise
- * known to have aborted.
+ * necessary, without returning its result or throwing its
+ * exception.
*/
public final void quietlyInvoke() {
- if (status >= 0 && !tryQuietlyInvoke())
- quietlyJoin();
+ if (status >= 0) {
+ boolean completed;
+ try {
+ completed = exec();
+ } catch (Throwable rex) {
+ setExceptionalCompletion(rex);
+ return;
+ }
+ if (completed)
+ setCompletion(NORMAL);
+ else
+ quietlyJoin();
+ }
}
/**
@@ -956,7 +839,7 @@
* pre-constructed trees of subtasks in loops.
*/
public void reinitialize() {
- if ((status & COMPLETION_MASK) == EXCEPTIONAL)
+ if (status == EXCEPTIONAL)
exceptionMap.remove(this);
status = 0;
}
@@ -1246,7 +1129,7 @@
private static final long serialVersionUID = -7721805057305804111L;
/**
- * Saves the state to a stream.
+ * Saves the state to a stream (that is, serializes it).
*
* @serialData the current run status and the exception thrown
* during execution, or {@code null} if none
@@ -1259,18 +1142,16 @@
}
/**
- * Reconstitutes the instance from a stream.
+ * Reconstitutes the instance from a stream (that is, deserializes it).
*
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
- status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
- status |= EXTERNAL_SIGNAL; // conservatively set external signal
Object ex = s.readObject();
if (ex != null)
- setDoneExceptionally((Throwable) ex);
+ setExceptionalCompletion((Throwable) ex);
}
// Unsafe mechanics
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Mon Sep 13 09:55:03 2010 +0100
@@ -35,7 +35,9 @@
package java.util.concurrent;
+import java.util.Random;
import java.util.Collection;
+import java.util.concurrent.locks.LockSupport;
/**
* A thread managed by a {@link ForkJoinPool}. This class is
@@ -52,46 +54,55 @@
*/
public class ForkJoinWorkerThread extends Thread {
/*
- * Algorithm overview:
+ * Overview:
+ *
+ * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
+ * ForkJoinTasks. This class includes bookkeeping in support of
+ * worker activation, suspension, and lifecycle control described
+ * in more detail in the internal documentation of class
+ * ForkJoinPool. And as described further below, this class also
+ * includes special-cased support for some ForkJoinTask
+ * methods. But the main mechanics involve work-stealing:
*
- * 1. Work-Stealing: Work-stealing queues are special forms of
- * Deques that support only three of the four possible
- * end-operations -- push, pop, and deq (aka steal), and only do
- * so under the constraints that push and pop are called only from
- * the owning thread, while deq may be called from other threads.
- * (If you are unfamiliar with them, you probably want to read
- * Herlihy and Shavit's book "The Art of Multiprocessor
- * programming", chapter 16 describing these in more detail before
- * proceeding.) The main work-stealing queue design is roughly
- * similar to "Dynamic Circular Work-Stealing Deque" by David
- * Chase and Yossi Lev, SPAA 2005
- * (http://research.sun.com/scalable/pubs/index.html). The main
- * difference ultimately stems from gc requirements that we null
- * out taken slots as soon as we can, to maintain as small a
- * footprint as possible even in programs generating huge numbers
- * of tasks. To accomplish this, we shift the CAS arbitrating pop
- * vs deq (steal) from being on the indices ("base" and "sp") to
- * the slots themselves (mainly via method "casSlotNull()"). So,
- * both a successful pop and deq mainly entail CAS'ing a non-null
- * slot to null. Because we rely on CASes of references, we do
- * not need tag bits on base or sp. They are simple ints as used
- * in any circular array-based queue (see for example ArrayDeque).
- * Updates to the indices must still be ordered in a way that
- * guarantees that (sp - base) > 0 means the queue is empty, but
- * otherwise may err on the side of possibly making the queue
- * appear nonempty when a push, pop, or deq have not fully
- * committed. Note that this means that the deq operation,
- * considered individually, is not wait-free. One thief cannot
- * successfully continue until another in-progress one (or, if
- * previously empty, a push) completes. However, in the
- * aggregate, we ensure at least probabilistic
- * non-blockingness. If an attempted steal fails, a thief always
- * chooses a different random victim target to try next. So, in
- * order for one thief to progress, it suffices for any
- * in-progress deq or new push on any empty queue to complete. One
- * reason this works well here is that apparently-nonempty often
- * means soon-to-be-stealable, which gives threads a chance to
- * activate if necessary before stealing (see below).
+ * Work-stealing queues are special forms of Deques that support
+ * only three of the four possible end-operations -- push, pop,
+ * and deq (aka steal), under the further constraints that push
+ * and pop are called only from the owning thread, while deq may
+ * be called from other threads. (If you are unfamiliar with
+ * them, you probably want to read Herlihy and Shavit's book "The
+ * Art of Multiprocessor programming", chapter 16 describing these
+ * in more detail before proceeding.) The main work-stealing
+ * queue design is roughly similar to those in the papers "Dynamic
+ * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
+ * (http://research.sun.com/scalable/pubs/index.html) and
+ * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
+ * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
+ * The main differences ultimately stem from gc requirements that
+ * we null out taken slots as soon as we can, to maintain as small
+ * a footprint as possible even in programs generating huge
+ * numbers of tasks. To accomplish this, we shift the CAS
+ * arbitrating pop vs deq (steal) from being on the indices
+ * ("base" and "sp") to the slots themselves (mainly via method
+ * "casSlotNull()"). So, both a successful pop and deq mainly
+ * entail a CAS of a slot from non-null to null. Because we rely
+ * on CASes of references, we do not need tag bits on base or sp.
+ * They are simple ints as used in any circular array-based queue
+ * (see for example ArrayDeque). Updates to the indices must
+ * still be ordered in a way that guarantees that sp == base means
+ * the queue is empty, but otherwise may err on the side of
+ * possibly making the queue appear nonempty when a push, pop, or
+ * deq have not fully committed. Note that this means that the deq
+ * operation, considered individually, is not wait-free. One thief
+ * cannot successfully continue until another in-progress one (or,
+ * if previously empty, a push) completes. However, in the
+ * aggregate, we ensure at least probabilistic non-blockingness.
+ * If an attempted steal fails, a thief always chooses a different
+ * random victim target to try next. So, in order for one thief to
+ * progress, it suffices for any in-progress deq or new push on
+ * any empty queue to complete. One reason this works well here is
+ * that apparently-nonempty often means soon-to-be-stealable,
+ * which gives threads a chance to set activation status if
+ * necessary before stealing.
*
* This approach also enables support for "async mode" where local
* task processing is in FIFO, not LIFO order; simply by using a
@@ -99,24 +110,54 @@
* by the ForkJoinPool). This allows use in message-passing
* frameworks in which tasks are never joined.
*
- * Efficient implementation of this approach currently relies on
- * an uncomfortable amount of "Unsafe" mechanics. To maintain
+ * When a worker would otherwise be blocked waiting to join a
+ * task, it first tries a form of linear helping: Each worker
+ * records (in field currentSteal) the most recent task it stole
+ * from some other worker. Plus, it records (in field currentJoin)
+ * the task it is currently actively joining. Method joinTask uses
+ * these markers to try to find a worker to help (i.e., steal back
+ * a task from and execute it) that could hasten completion of the
+ * actively joined task. In essence, the joiner executes a task
+ * that would be on its own local deque had the to-be-joined task
+ * not been stolen. This may be seen as a conservative variant of
+ * the approach in Wagner & Calder "Leapfrogging: a portable
+ * technique for implementing efficient futures" SIGPLAN Notices,
+ * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
+ * in that: (1) We only maintain dependency links across workers
+ * upon steals, rather than use per-task bookkeeping. This may
+ * require a linear scan of workers array to locate stealers, but
+ * usually doesn't because stealers leave hints (that may become
+ * stale/wrong) of where to locate them. This isolates cost to
+ * when it is needed, rather than adding to per-task overhead.
+ * (2) It is "shallow", ignoring nesting and potentially cyclic
+ * mutual steals. (3) It is intentionally racy: field currentJoin
+ * is updated only while actively joining, which means that we
+ * miss links in the chain during long-lived tasks, GC stalls etc
+ * (which is OK since blocking in such cases is usually a good
+ * idea). (4) We bound the number of attempts to find work (see
+ * MAX_HELP_DEPTH) and fall back to suspending the worker and if
+ * necessary replacing it with a spare (see
+ * ForkJoinPool.awaitJoin).
+ *
+ * Efficient implementation of these algorithms currently relies
+ * on an uncomfortable amount of "Unsafe" mechanics. To maintain
* correct orderings, reads and writes of variable base require
- * volatile ordering. Variable sp does not require volatile write
- * but needs cheaper store-ordering on writes. Because they are
- * protected by volatile base reads, reads of the queue array and
- * its slots do not need volatile load semantics, but writes (in
- * push) require store order and CASes (in pop and deq) require
- * (volatile) CAS semantics. (See "Idempotent work stealing" by
- * Michael, Saraswat, and Vechev, PPoPP 2009
- * http://portal.acm.org/citation.cfm?id=1504186 for an algorithm
- * with similar properties, but without support for nulling
- * slots.) Since these combinations aren't supported using
- * ordinary volatiles, the only way to accomplish these
- * efficiently is to use direct Unsafe calls. (Using external
- * AtomicIntegers and AtomicReferenceArrays for the indices and
- * array is significantly slower because of memory locality and
- * indirection effects.)
+ * volatile ordering. Variable sp does not require volatile
+ * writes but still needs store-ordering, which we accomplish by
+ * pre-incrementing sp before filling the slot with an ordered
+ * store. (Pre-incrementing also enables backouts used in
+ * joinTask.) Because they are protected by volatile base reads,
+ * reads of the queue array and its slots by other threads do not
+ * need volatile load semantics, but writes (in push) require
+ * store order and CASes (in pop and deq) require (volatile) CAS
+ * semantics. (Michael, Saraswat, and Vechev's algorithm has
+ * similar properties, but without support for nulling slots.)
+ * Since these combinations aren't supported using ordinary
+ * volatiles, the only way to accomplish these efficiently is to
+ * use direct Unsafe calls. (Using external AtomicIntegers and
+ * AtomicReferenceArrays for the indices and array is
+ * significantly slower because of memory locality and indirection
+ * effects.)
*
* Further, performance on most platforms is very sensitive to
* placement and sizing of the (resizable) queue array. Even
@@ -124,56 +165,45 @@
* initial size must be large enough to counteract cache
* contention effects across multiple queues (especially in the
* presence of GC cardmarking). Also, to improve thread-locality,
- * queues are currently initialized immediately after the thread
- * gets the initial signal to start processing tasks. However,
- * all queue-related methods except pushTask are written in a way
- * that allows them to instead be lazily allocated and/or disposed
- * of when empty. All together, these low-level implementation
- * choices produce as much as a factor of 4 performance
- * improvement compared to naive implementations, and enable the
- * processing of billions of tasks per second, sometimes at the
- * expense of ugliness.
- *
- * 2. Run control: The primary run control is based on a global
- * counter (activeCount) held by the pool. It uses an algorithm
- * similar to that in Herlihy and Shavit section 17.6 to cause
- * threads to eventually block when all threads declare they are
- * inactive. For this to work, threads must be declared active
- * when executing tasks, and before stealing a task. They must be
- * inactive before blocking on the Pool Barrier (awaiting a new
- * submission or other Pool event). In between, there is some free
- * play which we take advantage of to avoid contention and rapid
- * flickering of the global activeCount: If inactive, we activate
- * only if a victim queue appears to be nonempty (see above).
- * Similarly, a thread tries to inactivate only after a full scan
- * of other threads. The net effect is that contention on
- * activeCount is rarely a measurable performance issue. (There
- * are also a few other cases where we scan for work rather than
- * retry/block upon contention.)
- *
- * 3. Selection control. We maintain policy of always choosing to
- * run local tasks rather than stealing, and always trying to
- * steal tasks before trying to run a new submission. All steals
- * are currently performed in randomly-chosen deq-order. It may be
- * worthwhile to bias these with locality / anti-locality
- * information, but doing this well probably requires more
- * lower-level information from JVMs than currently provided.
+ * queues are initialized after starting. All together, these
+ * low-level implementation choices produce as much as a factor of
+ * 4 performance improvement compared to naive implementations,
+ * and enable the processing of billions of tasks per second,
+ * sometimes at the expense of ugliness.
*/
/**
+ * Generator for initial random seeds for random victim
+ * selection. This is used only to create initial seeds. Random
+ * steals use a cheaper xorshift generator per steal attempt. We
+ * expect only rare contention on seedGenerator, so just use a
+ * plain Random.
+ */
+ private static final Random seedGenerator = new Random();
+
+ /**
+ * The maximum stolen->joining link depth allowed in helpJoinTask.
+ * Depths for legitimate chains are unbounded, but we use a fixed
+ * constant to avoid (otherwise unchecked) cycles and bound
+ * staleness of traversal parameters at the expense of sometimes
+ * blocking when we could be helping.
+ */
+ private static final int MAX_HELP_DEPTH = 8;
+
+ /**
* Capacity of work-stealing queue array upon initialization.
- * Must be a power of two. Initial size must be at least 2, but is
+ * Must be a power of two. Initial size must be at least 4, but is
* padded to minimize cache effects.
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum work-stealing queue array size. Must be less than or
- * equal to 1 << 28 to ensure lack of index wraparound. (This
- * is less than usual bounds, because we need leftshift by 3
- * to be in int range).
+ * equal to 1 << (31 - width of array entry) to ensure lack of
+ * index wraparound. The value is set in the static block
+ * at the end of this file after obtaining width.
*/
- private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
+ private static final int MAXIMUM_QUEUE_CAPACITY;
/**
* The pool this thread works in. Accessed directly by ForkJoinTask.
@@ -182,65 +212,118 @@
/**
* The work-stealing queue array. Size must be a power of two.
- * Initialized when thread starts, to improve memory locality.
+ * Initialized in onStart, to improve memory locality.
*/
private ForkJoinTask<?>[] queue;
/**
- * Index (mod queue.length) of next queue slot to push to or pop
- * from. It is written only by owner thread, via ordered store.
- * Both sp and base are allowed to wrap around on overflow, but
- * (sp - base) still estimates size.
- */
- private volatile int sp;
-
- /**
* Index (mod queue.length) of least valid queue slot, which is
* always the next position to steal from if nonempty.
*/
private volatile int base;
/**
- * Activity status. When true, this worker is considered active.
- * Must be false upon construction. It must be true when executing
- * tasks, and BEFORE stealing a task. It must be false before
- * calling pool.sync.
+ * Index (mod queue.length) of next queue slot to push to or pop
+ * from. It is written only by owner thread, and accessed by other
+ * threads only after reading (volatile) base. Both sp and base
+ * are allowed to wrap around on overflow, but (sp - base) still
+ * estimates size.
*/
- private boolean active;
+ private int sp;
+
+ /**
+ * The index of most recent stealer, used as a hint to avoid
+ * traversal in method helpJoinTask. This is only a hint because a
+ * worker might have had multiple steals and this only holds one
+ * of them (usually the most current). Declared non-volatile,
+ * relying on other prevailing sync to keep reasonably current.
+ */
+ private int stealHint;
/**
- * Run state of this worker. Supports simple versions of the usual
- * shutdown/shutdownNow control.
+ * Run state of this worker. In addition to the usual run levels,
+ * tracks if this worker is suspended as a spare, and if it was
+ * killed (trimmed) while suspended. However, "active" status is
+ * maintained separately and modified only in conjunction with
+ * CASes of the pool's runState (which are currently sadly
+ * manually inlined for performance.) Accessed directly by pool
+ * to simplify checks for normal (zero) status.
*/
- private volatile int runState;
+ volatile int runState;
+
+ private static final int TERMINATING = 0x01;
+ private static final int TERMINATED = 0x02;
+ private static final int SUSPENDED = 0x04; // inactive spare
+ private static final int TRIMMED = 0x08; // killed while suspended
+
+ /**
+ * Number of steals. Directly accessed (and reset) by
+ * pool.tryAccumulateStealCount when idle.
+ */
+ int stealCount;
/**
* Seed for random number generator for choosing steal victims.
- * Uses Marsaglia xorshift. Must be nonzero upon initialization.
+ * Uses Marsaglia xorshift. Must be initialized as nonzero.
*/
private int seed;
/**
- * Number of steals, transferred to pool when idle
+ * Activity status. When true, this worker is considered active.
+ * Accessed directly by pool. Must be false upon construction.
*/
- private int stealCount;
+ boolean active;
+
+ /**
+ * True if use local fifo, not default lifo, for local polling.
+ * Shadows value from ForkJoinPool.
+ */
+ private final boolean locallyFifo;
/**
* Index of this worker in pool array. Set once by pool before
- * running, and accessed directly by pool during cleanup etc.
+ * running, and accessed directly by pool to locate this worker in
+ * its workers array.
*/
int poolIndex;
/**
- * The last barrier event waited for. Accessed in pool callback
- * methods, but only by current thread.
+ * The last pool event waited for. Accessed only by pool in
+ * callback methods invoked within this thread.
*/
- long lastEventCount;
+ int lastEventCount;
+
+ /**
+ * Encoded index and event count of next event waiter. Accessed
+ * only by ForkJoinPool for managing event waiters.
+ */
+ volatile long nextWaiter;
+
+ /**
+ * Number of times this thread suspended as spare. Accessed only
+ * by pool.
+ */
+ int spareCount;
/**
- * True if use local fifo, not default lifo, for local polling
+ * Encoded index and count of next spare waiter. Accessed only
+ * by ForkJoinPool for managing spares.
*/
- private boolean locallyFifo;
+ volatile int nextSpare;
+
+ /**
+ * The task currently being joined, set only when actively trying
+ * to help other stealers in helpJoinTask. Written only by this
+ * thread, but read by others.
+ */
+ private volatile ForkJoinTask<?> currentJoin;
+
+ /**
+ * The task most recently stolen from another worker (or
+ * submission queue). Written only by this thread, but read by
+ * others.
+ */
+ private volatile ForkJoinTask<?> currentSteal;
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
@@ -249,13 +332,24 @@
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
- if (pool == null) throw new NullPointerException();
this.pool = pool;
- // Note: poolIndex is set by pool during construction
- // Remaining initialization is deferred to onStart
+ this.locallyFifo = pool.locallyFifo;
+ setDaemon(true);
+ // To avoid exposing construction details to subclasses,
+ // remaining initialization is in start() and onStart()
}
- // Public access methods
+ /**
+ * Performs additional initialization and starts this thread.
+ */
+ final void start(int poolIndex, UncaughtExceptionHandler ueh) {
+ this.poolIndex = poolIndex;
+ if (ueh != null)
+ setUncaughtExceptionHandler(ueh);
+ start();
+ }
+
+ // Public/protected methods
/**
* Returns the pool hosting this thread.
@@ -280,129 +374,24 @@
}
/**
- * Establishes local first-in-first-out scheduling mode for forked
- * tasks that are never joined.
- *
- * @param async if true, use locally FIFO scheduling
- */
- void setAsyncMode(boolean async) {
- locallyFifo = async;
- }
-
- // Runstate management
-
- // Runstate values. Order matters
- private static final int RUNNING = 0;
- private static final int SHUTDOWN = 1;
- private static final int TERMINATING = 2;
- private static final int TERMINATED = 3;
-
- final boolean isShutdown() { return runState >= SHUTDOWN; }
- final boolean isTerminating() { return runState >= TERMINATING; }
- final boolean isTerminated() { return runState == TERMINATED; }
- final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
- final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
-
- /**
- * Transitions to at least the given state.
- *
- * @return {@code true} if not already at least at given state
- */
- private boolean transitionRunStateTo(int state) {
- for (;;) {
- int s = runState;
- if (s >= state)
- return false;
- if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
- return true;
- }
- }
-
- /**
- * Tries to set status to active; fails on contention.
- */
- private boolean tryActivate() {
- if (!active) {
- if (!pool.tryIncrementActiveCount())
- return false;
- active = true;
- }
- return true;
- }
-
- /**
- * Tries to set status to inactive; fails on contention.
- */
- private boolean tryInactivate() {
- if (active) {
- if (!pool.tryDecrementActiveCount())
- return false;
- active = false;
- }
- return true;
- }
-
- /**
- * Computes next value for random victim probe. Scans don't
- * require a very high quality generator, but also not a crummy
- * one. Marsaglia xor-shift is cheap and works well.
- */
- private static int xorShift(int r) {
- r ^= (r << 13);
- r ^= (r >>> 17);
- return r ^ (r << 5);
- }
-
- // Lifecycle methods
-
- /**
- * This method is required to be public, but should never be
- * called explicitly. It performs the main run loop to execute
- * ForkJoinTasks.
- */
- public void run() {
- Throwable exception = null;
- try {
- onStart();
- pool.sync(this); // await first pool event
- mainLoop();
- } catch (Throwable ex) {
- exception = ex;
- } finally {
- onTermination(exception);
- }
- }
-
- /**
- * Executes tasks until shut down.
- */
- private void mainLoop() {
- while (!isShutdown()) {
- ForkJoinTask<?> t = pollTask();
- if (t != null || (t = pollSubmission()) != null)
- t.quietlyExec();
- else if (tryInactivate())
- pool.sync(this);
- }
- }
-
- /**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
- * invoke super.onStart() at the beginning of the method.
+ * invoke @code{super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
- // Allocate while starting to improve chances of thread-local
- // isolation
+ int rs = seedGenerator.nextInt();
+ seed = rs == 0? 1 : rs; // seed must be nonzero
+
+ // Allocate name string and arrays in this thread
+ String pid = Integer.toString(pool.getPoolNumber());
+ String wid = Integer.toString(poolIndex);
+ setName("ForkJoinPool-" + pid + "-worker-" + wid);
+
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
- // Initial value of seed need not be especially random but
- // should differ across workers and must be nonzero
- int p = poolIndex + 1;
- seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
}
/**
@@ -414,97 +403,187 @@
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
- // Execute remaining local tasks unless aborting or terminating
- while (exception == null && pool.isProcessingTasks() && base != sp) {
- try {
- ForkJoinTask<?> t = popTask();
- if (t != null)
- t.quietlyExec();
- } catch (Throwable ex) {
- exception = ex;
+ try {
+ ForkJoinPool p = pool;
+ if (active) {
+ int a; // inline p.tryDecrementActiveCount
+ active = false;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a - 1));
}
- }
- // Cancel other tasks, transition status, notify pool, and
- // propagate exception to uncaught exception handler
- try {
- do {} while (!tryInactivate()); // ensure inactive
cancelTasks();
- runState = TERMINATED;
- pool.workerTerminated(this);
+ setTerminated();
+ p.workerTerminated(this);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
- ForkJoinTask.rethrowException(exception);
+ UNSAFE.throwException(exception);
+ }
+ }
+
+ /**
+ * This method is required to be public, but should never be
+ * called explicitly. It performs the main run loop to execute
+ * ForkJoinTasks.
+ */
+ public void run() {
+ Throwable exception = null;
+ try {
+ onStart();
+ mainLoop();
+ } catch (Throwable ex) {
+ exception = ex;
+ } finally {
+ onTermination(exception);
}
}
- // Intrinsics-based support for queue operations.
+ // helpers for run()
- private static long slotOffset(int i) {
- return ((long) i << qShift) + qBase;
+ /**
+ * Finds and executes tasks, and checks status while running.
+ */
+ private void mainLoop() {
+ boolean ran = false; // true if ran a task on last step
+ ForkJoinPool p = pool;
+ for (;;) {
+ p.preStep(this, ran);
+ if (runState != 0)
+ break;
+ ran = tryExecSteal() || tryExecSubmission();
+ }
}
/**
- * Adds in store-order the given task at given slot of q to null.
- * Caller must ensure q is non-null and index is in range.
+ * Tries to steal a task and execute it.
+ *
+ * @return true if ran a task
*/
- private static void setSlot(ForkJoinTask<?>[] q, int i,
- ForkJoinTask<?> t) {
- UNSAFE.putOrderedObject(q, slotOffset(i), t);
+ private boolean tryExecSteal() {
+ ForkJoinTask<?> t;
+ if ((t = scan()) != null) {
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
+ if (sp != base)
+ execLocalTasks();
+ return true;
+ }
+ return false;
}
/**
- * CAS given slot of q to null. Caller must ensure q is non-null
- * and index is in range.
+ * If a submission exists, try to activate and run it.
+ *
+ * @return true if ran a task
*/
- private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
- ForkJoinTask<?> t) {
- return UNSAFE.compareAndSwapObject(q, slotOffset(i), t, null);
+ private boolean tryExecSubmission() {
+ ForkJoinPool p = pool;
+ // This loop is needed in case attempt to activate fails, in
+ // which case we only retry if there still appears to be a
+ // submission.
+ while (p.hasQueuedSubmissions()) {
+ ForkJoinTask<?> t; int a;
+ if (active || // inline p.tryIncrementActiveCount
+ (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1))) {
+ if ((t = p.pollSubmission()) != null) {
+ UNSAFE.putOrderedObject(this, currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
+ if (sp != base)
+ execLocalTasks();
+ return true;
+ }
+ }
+ }
+ return false;
}
/**
- * Sets sp in store-order.
+ * Runs local tasks until queue is empty or shut down. Call only
+ * while active.
*/
- private void storeSp(int s) {
- UNSAFE.putOrderedInt(this, spOffset, s);
+ private void execLocalTasks() {
+ while (runState == 0) {
+ ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
+ if (t != null)
+ t.quietlyExec();
+ else if (sp == base)
+ break;
+ }
}
- // Main queue methods
+ /*
+ * Intrinsics-based atomic writes for queue slots. These are
+ * basically the same as methods in AtomicReferenceArray, but
+ * specialized for (1) ForkJoinTask elements (2) requirement that
+ * nullness and bounds checks have already been performed by
+ * callers and (3) effective offsets are known not to overflow
+ * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
+ * need corresponding version for reads: plain array reads are OK
+ * because they are protected by other volatile reads and are
+ * confirmed by CASes.
+ *
+ * Most uses don't actually call these methods, but instead contain
+ * inlined forms that enable more predictable optimization. We
+ * don't define the version of write used in pushTask at all, but
+ * instead inline there a store-fenced array slot write.
+ */
/**
- * Pushes a task. Called only by current thread.
+ * CASes slot i of array q from t to null. Caller must ensure q is
+ * non-null and index is in range.
+ */
+ private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t) {
+ return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
+ }
+
+ /**
+ * Performs a volatile write of the given task at given slot of
+ * array q. Caller must ensure q is non-null and index is in
+ * range. This method is used only during resets and backouts.
+ */
+ private static final void writeSlot(ForkJoinTask<?>[] q, int i,
+ ForkJoinTask<?> t) {
+ UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
+ }
+
+ // queue methods
+
+ /**
+ * Pushes a task. Call only from this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q = queue;
- int mask = q.length - 1;
- int s = sp;
- setSlot(q, s & mask, t);
- storeSp(++s);
- if ((s -= base) == 1)
- pool.signalWork();
- else if (s >= mask)
- growQueue();
+ int mask = q.length - 1; // implicit assert q != null
+ int s = sp++; // ok to increment sp before slot write
+ UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
+ if ((s -= base) == 0)
+ pool.signalWork(); // was empty
+ else if (s == mask)
+ growQueue(); // is full
}
/**
* Tries to take a task from the base of the queue, failing if
- * either empty or contended.
+ * empty or contended. Note: Specializations of this code appear
+ * in locallyDeqTask and elsewhere.
*
* @return a task, or null if none or contended
*/
final ForkJoinTask<?> deqTask() {
ForkJoinTask<?> t;
ForkJoinTask<?>[] q;
- int i;
- int b;
+ int b, i;
if (sp != (b = base) &&
(q = queue) != null && // must read q after b
- (t = q[i = (q.length - 1) & b]) != null &&
- casSlotNull(q, i, t)) {
+ (t = q[i = (q.length - 1) & b]) != null && base == b &&
+ UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
base = b + 1;
return t;
}
@@ -512,19 +591,20 @@
}
/**
- * Tries to take a task from the base of own queue, activating if
- * necessary, failing only if empty. Called only by current thread.
+ * Tries to take a task from the base of own queue. Assumes active
+ * status. Called only by this thread.
*
* @return a task, or null if none
*/
final ForkJoinTask<?> locallyDeqTask() {
- int b;
- while (sp != (b = base)) {
- if (tryActivate()) {
- ForkJoinTask<?>[] q = queue;
- int i = (q.length - 1) & b;
- ForkJoinTask<?> t = q[i];
- if (t != null && casSlotNull(q, i, t)) {
+ ForkJoinTask<?>[] q = queue;
+ if (q != null) {
+ ForkJoinTask<?> t;
+ int b, i;
+ while (sp != (b = base)) {
+ if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
+ UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
+ t, null)) {
base = b + 1;
return t;
}
@@ -534,46 +614,50 @@
}
/**
- * Returns a popped task, or null if empty. Ensures active status
- * if non-null. Called only by current thread.
+ * Returns a popped task, or null if empty. Assumes active status.
+ * Called only by this thread.
*/
- final ForkJoinTask<?> popTask() {
- int s = sp;
- while (s != base) {
- if (tryActivate()) {
- ForkJoinTask<?>[] q = queue;
- int mask = q.length - 1;
- int i = (s - 1) & mask;
+ private ForkJoinTask<?> popTask() {
+ ForkJoinTask<?>[] q = queue;
+ if (q != null) {
+ int s;
+ while ((s = sp) != base) {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
- if (t == null || !casSlotNull(q, i, t))
+ if (t == null) // lost to stealer
break;
- storeSp(s - 1);
- return t;
+ if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ return t;
+ }
}
}
return null;
}
/**
- * Specialized version of popTask to pop only if
- * topmost element is the given task. Called only
- * by current thread while active.
+ * Specialized version of popTask to pop only if topmost element
+ * is the given task. Called only by this thread while active.
*
* @param t the task. Caller must ensure non-null.
*/
final boolean unpushTask(ForkJoinTask<?> t) {
+ int s;
ForkJoinTask<?>[] q = queue;
- int mask = q.length - 1;
- int s = sp - 1;
- if (casSlotNull(q, s & mask, t)) {
- storeSp(s);
+ if ((s = sp) != base && q != null &&
+ UNSAFE.compareAndSwapObject
+ (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
return true;
}
return false;
}
/**
- * Returns next task or null if empty or contended
+ * Returns next task, or null if empty or contended.
*/
final ForkJoinTask<?> peekTask() {
ForkJoinTask<?>[] q = queue;
@@ -606,104 +690,209 @@
ForkJoinTask<?> t = oldQ[oldIndex];
if (t != null && !casSlotNull(oldQ, oldIndex, t))
t = null;
- setSlot(newQ, b & newMask, t);
+ writeSlot(newQ, b & newMask, t);
} while (++b != bf);
pool.signalWork();
}
/**
+ * Computes next value for random victim probe in scan(). Scans
+ * don't require a very high quality generator, but also not a
+ * crummy one. Marsaglia xor-shift is cheap and works well enough.
+ * Note: This is manually inlined in scan().
+ */
+ private static final int xorShift(int r) {
+ r ^= r << 13;
+ r ^= r >>> 17;
+ return r ^ (r << 5);
+ }
+
+ /**
* Tries to steal a task from another worker. Starts at a random
* index of workers array, and probes workers until finding one
* with non-empty queue or finding that all are empty. It
* randomly selects the first n probes. If these are empty, it
- * resorts to a full circular traversal, which is necessary to
- * accurately set active status by caller. Also restarts if pool
- * events occurred since last scan, which forces refresh of
- * workers array, in case barrier was associated with resize.
+ * resorts to a circular sweep, which is necessary to accurately
+ * set active status. (The circular sweep uses steps of
+ * approximately half the array size plus 1, to avoid bias
+ * stemming from leftmost packing of the array in ForkJoinPool.)
*
* This method must be both fast and quiet -- usually avoiding
* memory accesses that could disrupt cache sharing etc other than
- * those needed to check for and take tasks. This accounts for,
- * among other things, updating random seed in place without
- * storing it until exit.
+ * those needed to check for and take tasks (or to activate if not
+ * already active). This accounts for, among other things,
+ * updating random seed in place without storing it until exit.
*
* @return a task, or null if none found
*/
private ForkJoinTask<?> scan() {
- ForkJoinTask<?> t = null;
- int r = seed; // extract once to keep scan quiet
- ForkJoinWorkerThread[] ws; // refreshed on outer loop
- int mask; // must be power 2 minus 1 and > 0
- outer:do {
- if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
- int idx = r;
- int probes = ~mask; // use random index while negative
- for (;;) {
- r = xorShift(r); // update random seed
- ForkJoinWorkerThread v = ws[mask & idx];
- if (v == null || v.sp == v.base) {
- if (probes <= mask)
- idx = (probes++ < 0) ? r : (idx + 1);
- else
- break;
+ ForkJoinPool p = pool;
+ ForkJoinWorkerThread[] ws; // worker array
+ int n; // upper bound of #workers
+ if ((ws = p.workers) != null && (n = ws.length) > 1) {
+ boolean canSteal = active; // shadow active status
+ int r = seed; // extract seed once
+ int mask = n - 1;
+ int j = -n; // loop counter
+ int k = r; // worker index, random if j < 0
+ for (;;) {
+ ForkJoinWorkerThread v = ws[k & mask];
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
+ ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
+ if (v != null && (b = v.base) != v.sp &&
+ (q = v.queue) != null) {
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase; // raw offset
+ int pid = poolIndex;
+ if ((t = q[i]) != null) {
+ if (!canSteal && // inline p.tryIncrementActiveCount
+ UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1))
+ canSteal = active = true;
+ if (canSteal && v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ v.base = b;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, t);
+ seed = r;
+ ++stealCount;
+ return t;
+ }
}
- else if (!tryActivate() || (t = v.deqTask()) == null)
- continue outer; // restart on contention
- else
- break outer;
+ j = -n;
+ k = r; // restart on contention
}
+ else if (++j <= 0)
+ k = r;
+ else if (j <= n)
+ k += (n >>> 1) | 1;
+ else
+ break;
}
- } while (pool.hasNewSyncEvent(this)); // retry on pool events
- seed = r;
- return t;
- }
-
- /**
- * Gets and removes a local or stolen task.
- *
- * @return a task, if available
- */
- final ForkJoinTask<?> pollTask() {
- ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
- if (t == null && (t = scan()) != null)
- ++stealCount;
- return t;
- }
-
- /**
- * Gets a local task.
- *
- * @return a task, if available
- */
- final ForkJoinTask<?> pollLocalTask() {
- return locallyFifo ? locallyDeqTask() : popTask();
- }
-
- /**
- * Returns a pool submission, if one exists, activating first.
- *
- * @return a submission, if available
- */
- private ForkJoinTask<?> pollSubmission() {
- ForkJoinPool p = pool;
- while (p.hasQueuedSubmissions()) {
- ForkJoinTask<?> t;
- if (tryActivate() && (t = p.pollSubmission()) != null)
- return t;
}
return null;
}
- // Methods accessed only by Pool
+ // Run State management
+
+ // status check methods used mainly by ForkJoinPool
+ final boolean isRunning() { return runState == 0; }
+ final boolean isTerminating() { return (runState & TERMINATING) != 0; }
+ final boolean isTerminated() { return (runState & TERMINATED) != 0; }
+ final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
+ final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
+
+ /**
+ * Sets state to TERMINATING. Does NOT unpark or interrupt
+ * to wake up if currently blocked. Callers must do so if desired.
+ */
+ final void shutdown() {
+ for (;;) {
+ int s = runState;
+ if ((s & (TERMINATING|TERMINATED)) != 0)
+ break;
+ if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ (s & ~SUSPENDED) |
+ (TRIMMED|TERMINATING)))
+ break;
+ }
+ else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | TERMINATING))
+ break;
+ }
+ }
+
+ /**
+ * Sets state to TERMINATED. Called only by onTermination().
+ */
+ private void setTerminated() {
+ int s;
+ do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
+ s = runState,
+ s | (TERMINATING|TERMINATED)));
+ }
+
+ /**
+ * If suspended, tries to set status to unsuspended.
+ * Does NOT wake up if blocked.
+ *
+ * @return true if successful
+ */
+ final boolean tryUnsuspend() {
+ int s;
+ while (((s = runState) & SUSPENDED) != 0) {
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s & ~SUSPENDED))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets suspended status and blocks as spare until resumed
+ * or shutdown.
+ */
+ final void suspendAsSpare() {
+ for (;;) { // set suspended unless terminating
+ int s = runState;
+ if ((s & TERMINATING) != 0) { // must kill
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | (TRIMMED | TERMINATING)))
+ return;
+ }
+ else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | SUSPENDED))
+ break;
+ }
+ ForkJoinPool p = pool;
+ p.pushSpare(this);
+ while ((runState & SUSPENDED) != 0) {
+ if (p.tryAccumulateStealCount(this)) {
+ interrupted(); // clear/ignore interrupts
+ if ((runState & SUSPENDED) == 0)
+ break;
+ LockSupport.park(this);
+ }
+ }
+ }
+
+ // Misc support methods for ForkJoinPool
+
+ /**
+ * Returns an estimate of the number of tasks in the queue. Also
+ * used by ForkJoinTask.
+ */
+ final int getQueueSize() {
+ int n; // external calls must read base first
+ return (n = -base + sp) <= 0 ? 0 : n;
+ }
/**
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
- ForkJoinTask<?> t;
- while (base != sp && (t = deqTask()) != null)
- t.cancelIgnoringExceptions();
+ ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
+ if (cj != null) {
+ currentJoin = null;
+ cj.cancelIgnoringExceptions();
+ try {
+ this.interrupt(); // awaken wait
+ } catch (SecurityException ignore) {
+ }
+ }
+ ForkJoinTask<?> cs = currentSteal;
+ if (cs != null) {
+ currentSteal = null;
+ cs.cancelIgnoringExceptions();
+ }
+ while (base != sp) {
+ ForkJoinTask<?> t = deqTask();
+ if (t != null)
+ t.cancelIgnoringExceptions();
+ }
}
/**
@@ -713,87 +902,266 @@
*/
final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
int n = 0;
- ForkJoinTask<?> t;
- while (base != sp && (t = deqTask()) != null) {
- c.add(t);
- ++n;
+ while (base != sp) {
+ ForkJoinTask<?> t = deqTask();
+ if (t != null) {
+ c.add(t);
+ ++n;
+ }
}
return n;
}
- /**
- * Gets and clears steal count for accumulation by pool. Called
- * only when known to be idle (in pool.sync and termination).
- */
- final int getAndClearStealCount() {
- int sc = stealCount;
- stealCount = 0;
- return sc;
- }
-
- /**
- * Returns {@code true} if at least one worker in the given array
- * appears to have at least one queued task.
- *
- * @param ws array of workers
- */
- static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
- if (ws != null) {
- int len = ws.length;
- for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
- for (int i = 0; i < len; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null && w.sp != w.base)
- return true;
- }
- }
- }
- return false;
- }
-
// Support methods for ForkJoinTask
/**
- * Returns an estimate of the number of tasks in the queue.
+ * Gets and removes a local task.
+ *
+ * @return a task, if available
+ */
+ final ForkJoinTask<?> pollLocalTask() {
+ ForkJoinPool p = pool;
+ while (sp != base) {
+ int a; // inline p.tryIncrementActiveCount
+ if (active ||
+ (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1)))
+ return locallyFifo ? locallyDeqTask() : popTask();
+ }
+ return null;
+ }
+
+ /**
+ * Gets and removes a local or stolen task.
+ *
+ * @return a task, if available
+ */
+ final ForkJoinTask<?> pollTask() {
+ ForkJoinTask<?> t = pollLocalTask();
+ if (t == null) {
+ t = scan();
+ // cannot retain/track/help steal
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
+ }
+ return t;
+ }
+
+ /**
+ * Possibly runs some tasks and/or blocks, until task is done.
+ *
+ * @param joinMe the task to join
*/
- final int getQueueSize() {
- // suppress momentarily negative values
- return Math.max(0, sp - base);
+ final void joinTask(ForkJoinTask<?> joinMe) {
+ // currentJoin only written by this thread; only need ordered store
+ ForkJoinTask<?> prevJoin = currentJoin;
+ UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
+ if (sp != base)
+ localHelpJoinTask(joinMe);
+ if (joinMe.status >= 0)
+ pool.awaitJoin(joinMe, this);
+ UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
+ }
+
+ /**
+ * Run tasks in local queue until given task is done.
+ *
+ * @param joinMe the task to join
+ */
+ private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
+ int s;
+ ForkJoinTask<?>[] q;
+ while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
+ ForkJoinTask<?> t = q[i];
+ if (t == null) // lost to a stealer
+ break;
+ if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * This recheck (and similarly in helpJoinTask)
+ * handles cases where joinMe is independently
+ * cancelled or forced even though there is other work
+ * available. Back out of the pop by putting t back
+ * into slot before we commit by writing sp.
+ */
+ if (joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ break;
+ }
+ sp = s;
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ t.quietlyExec();
+ }
+ }
}
/**
- * Returns an estimate of the number of tasks, offset by a
- * function of number of idle workers.
+ * Unless terminating, tries to locate and help perform tasks for
+ * a stealer of the given task, or in turn one of its stealers.
+ * Traces currentSteal->currentJoin links looking for a thread
+ * working on a descendant of the given task and with a non-empty
+ * queue to steal back and execute tasks from.
+ *
+ * The implementation is very branchy to cope with potential
+ * inconsistencies or loops encountering chains that are stale,
+ * unknown, or of length greater than MAX_HELP_DEPTH links. All
+ * of these cases are dealt with by just returning back to the
+ * caller, who is expected to retry if other join mechanisms also
+ * don't work out.
+ *
+ * @param joinMe the task to join
*/
- final int getEstimatedSurplusTaskCount() {
- // The halving approximates weighting idle vs non-idle workers
- return (sp - base) - (pool.getIdleThreadCount() >>> 1);
+ final void helpJoinTask(ForkJoinTask<?> joinMe) {
+ ForkJoinWorkerThread[] ws;
+ int n;
+ if (joinMe.status < 0) // already done
+ return;
+ if ((runState & TERMINATING) != 0) { // cancel if shutting down
+ joinMe.cancelIgnoringExceptions();
+ return;
+ }
+ if ((ws = pool.workers) == null || (n = ws.length) <= 1)
+ return; // need at least 2 workers
+
+ ForkJoinTask<?> task = joinMe; // base of chain
+ ForkJoinWorkerThread thread = this; // thread with stolen task
+ for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+ // Try to find v, the stealer of task, by first using hint
+ ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
+ if (v == null || v.currentSteal != task) {
+ for (int j = 0; ; ++j) { // search array
+ if (j < n) {
+ ForkJoinTask<?> vs;
+ if ((v = ws[j]) != null &&
+ (vs = v.currentSteal) != null) {
+ if (joinMe.status < 0 || task.status < 0)
+ return; // stale or done
+ if (vs == task) {
+ thread.stealHint = j;
+ break; // save hint for next time
+ }
+ }
+ }
+ else
+ return; // no stealer
+ }
+ }
+ for (;;) { // Try to help v, using specialized form of deqTask
+ if (joinMe.status < 0)
+ return;
+ int b = v.base;
+ ForkJoinTask<?>[] q = v.queue;
+ if (b == v.sp || q == null)
+ break;
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase;
+ ForkJoinTask<?> t = q[i];
+ int pid = poolIndex;
+ ForkJoinTask<?> ps = currentSteal;
+ if (task.status < 0)
+ return; // stale or done
+ if (t != null && v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ if (joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ return; // back out on cancel
+ }
+ v.base = b;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this, currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+ }
+ }
+ // Try to descend to find v's stealer
+ ForkJoinTask<?> next = v.currentJoin;
+ if (task.status < 0 || next == null || next == task ||
+ joinMe.status < 0)
+ return;
+ task = next;
+ thread = v;
+ }
}
/**
- * Scans, returning early if joinMe done.
+ * Implements ForkJoinTask.getSurplusQueuedTaskCount().
+ * Returns an estimate of the number of tasks, offset by a
+ * function of number of idle workers.
+ *
+ * This method provides a cheap heuristic guide for task
+ * partitioning when programmers, frameworks, tools, or languages
+ * have little or no idea about task granularity. In essence by
+ * offering this method, we ask users only about tradeoffs in
+ * overhead vs expected throughput and its variance, rather than
+ * how finely to partition tasks.
+ *
+ * In a steady state strict (tree-structured) computation, each
+ * thread makes available for stealing enough tasks for other
+ * threads to remain active. Inductively, if all threads play by
+ * the same rules, each thread should make available only a
+ * constant number of tasks.
+ *
+ * The minimum useful constant is just 1. But using a value of 1
+ * would require immediate replenishment upon each steal to
+ * maintain enough tasks, which is infeasible. Further,
+ * partitionings/granularities of offered tasks should minimize
+ * steal rates, which in general means that threads nearer the top
+ * of computation tree should generate more than those nearer the
+ * bottom. In perfect steady state, each thread is at
+ * approximately the same level of computation tree. However,
+ * producing extra tasks amortizes the uncertainty of progress and
+ * diffusion assumptions.
+ *
+ * So, users will want to use values larger, but not much larger
+ * than 1 to both smooth over transient shortages and hedge
+ * against uneven progress; as traded off against the cost of
+ * extra task overhead. We leave the user to pick a threshold
+ * value to compare with the results of this call to guide
+ * decisions, but recommend values such as 3.
+ *
+ * When all threads are active, it is on average OK to estimate
+ * surplus strictly locally. In steady-state, if one thread is
+ * maintaining say 2 surplus tasks, then so are others. So we can
+ * just use estimated queue length (although note that (sp - base)
+ * can be an overestimate because of stealers lagging increments
+ * of base). However, this strategy alone leads to serious
+ * mis-estimates in some non-steady-state conditions (ramp-up,
+ * ramp-down, other stalls). We can detect many of these by
+ * further considering the number of "idle" threads, that are
+ * known to have zero queued tasks, so compensate by a factor of
+ * (#idle/#active) threads.
*/
- final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
- ForkJoinTask<?> t = pollTask();
- if (t != null && joinMe.status < 0 && sp == base) {
- pushTask(t); // unsteal if done and this task would be stealable
- t = null;
- }
- return t;
+ final int getEstimatedSurplusTaskCount() {
+ return sp - base - pool.idlePerActive();
}
/**
* Runs tasks until {@code pool.isQuiescent()}.
*/
final void helpQuiescePool() {
+ ForkJoinTask<?> ps = currentSteal; // to restore below
for (;;) {
- ForkJoinTask<?> t = pollTask();
- if (t != null)
+ ForkJoinTask<?> t = pollLocalTask();
+ if (t != null || (t = scan()) != null)
t.quietlyExec();
- else if (tryInactivate() && pool.isQuiescent())
- break;
+ else {
+ ForkJoinPool p = pool;
+ int a; // to inline CASes
+ if (active) {
+ if (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a - 1))
+ continue; // retry later
+ active = false; // inactivate
+ UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+ }
+ if (p.isQuiescent()) {
+ active = true; // re-activate
+ do {} while (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a+1));
+ return;
+ }
+ }
}
- do {} while (!tryActivate()); // re-activate on exit
}
// Unsafe mechanics
@@ -803,15 +1171,23 @@
objectFieldOffset("sp", ForkJoinWorkerThread.class);
private static final long runStateOffset =
objectFieldOffset("runState", ForkJoinWorkerThread.class);
- private static final long qBase;
+ private static final long currentJoinOffset =
+ objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
+ private static final long currentStealOffset =
+ objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
+ private static final long qBase =
+ UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
+ private static final long poolRunStateOffset = // to inline CAS
+ objectFieldOffset("runState", ForkJoinPool.class);
+
private static final int qShift;
static {
- qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
qShift = 31 - Integer.numberOfLeadingZeros(s);
+ MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
}
private static long objectFieldOffset(String field, Class<?> klazz) {
--- a/jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/LinkedTransferQueue.java Mon Sep 13 09:55:03 2010 +0100
@@ -42,6 +42,7 @@
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.locks.LockSupport;
+
/**
* An unbounded {@link TransferQueue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect
@@ -233,24 +234,6 @@
* additional GC bookkeeping ("write barriers") that are sometimes
* more costly than the writes themselves because of contention).
*
- * Removal of interior nodes (due to timed out or interrupted
- * waits, or calls to remove(x) or Iterator.remove) can use a
- * scheme roughly similar to that described in Scherer, Lea, and
- * Scott's SynchronousQueue. Given a predecessor, we can unsplice
- * any node except the (actual) tail of the queue. To avoid
- * build-up of cancelled trailing nodes, upon a request to remove
- * a trailing node, it is placed in field "cleanMe" to be
- * unspliced upon the next call to unsplice any other node.
- * Situations needing such mechanics are not common but do occur
- * in practice; for example when an unbounded series of short
- * timed calls to poll repeatedly time out but never otherwise
- * fall off the list because of an untimed call to take at the
- * front of the queue. Note that maintaining field cleanMe does
- * not otherwise much impact garbage retention even if never
- * cleared by some other call because the held node will
- * eventually either directly or indirectly lead to a self-link
- * once off the list.
- *
* *** Overview of implementation ***
*
* We use a threshold-based approach to updates, with a slack
@@ -266,15 +249,10 @@
* per-thread one available, but even ThreadLocalRandom is too
* heavy for these purposes.
*
- * With such a small slack threshold value, it is rarely
- * worthwhile to augment this with path short-circuiting; i.e.,
- * unsplicing nodes between head and the first unmatched node, or
- * similarly for tail, rather than advancing head or tail
- * proper. However, it is used (in awaitMatch) immediately before
- * a waiting thread starts to block, as a final bit of helping at
- * a point when contention with others is extremely unlikely
- * (since if other threads that could release it are operating,
- * then the current thread wouldn't be blocking).
+ * With such a small slack threshold value, it is not worthwhile
+ * to augment this with path short-circuiting (i.e., unsplicing
+ * interior nodes) except in the case of cancellation/removal (see
+ * below).
*
* We allow both the head and tail fields to be null before any
* nodes are enqueued; initializing upon first append. This
@@ -356,6 +334,70 @@
* versa) compared to their predecessors receive additional
* chained spins, reflecting longer paths typically required to
* unblock threads during phase changes.
+ *
+ *
+ * ** Unlinking removed interior nodes **
+ *
+ * In addition to minimizing garbage retention via self-linking
+ * described above, we also unlink removed interior nodes. These
+ * may arise due to timed out or interrupted waits, or calls to
+ * remove(x) or Iterator.remove. Normally, given a node that was
+ * at one time known to be the predecessor of some node s that is
+ * to be removed, we can unsplice s by CASing the next field of
+ * its predecessor if it still points to s (otherwise s must
+ * already have been removed or is now offlist). But there are two
+ * situations in which we cannot guarantee to make node s
+ * unreachable in this way: (1) If s is the trailing node of list
+ * (i.e., with null next), then it is pinned as the target node
+ * for appends, so can only be removed later after other nodes are
+ * appended. (2) We cannot necessarily unlink s given a
+ * predecessor node that is matched (including the case of being
+ * cancelled): the predecessor may already be unspliced, in which
+ * case some previous reachable node may still point to s.
+ * (For further explanation see Herlihy & Shavit "The Art of
+ * Multiprocessor Programming" chapter 9). Although, in both
+ * cases, we can rule out the need for further action if either s
+ * or its predecessor are (or can be made to be) at, or fall off
+ * from, the head of list.
+ *
+ * Without taking these into account, it would be possible for an
+ * unbounded number of supposedly removed nodes to remain
+ * reachable. Situations leading to such buildup are uncommon but
+ * can occur in practice; for example when a series of short timed
+ * calls to poll repeatedly time out but never otherwise fall off
+ * the list because of an untimed call to take at the front of the
+ * queue.
+ *
+ * When these cases arise, rather than always retraversing the
+ * entire list to find an actual predecessor to unlink (which
+ * won't help for case (1) anyway), we record a conservative
+ * estimate of possible unsplice failures (in "sweepVotes").
+ * We trigger a full sweep when the estimate exceeds a threshold
+ * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
+ * removal failures to tolerate before sweeping through, unlinking
+ * cancelled nodes that were not unlinked upon initial removal.
+ * We perform sweeps by the thread hitting threshold (rather than
+ * background threads or by spreading work to other threads)
+ * because in the main contexts in which removal occurs, the
+ * caller is already timed-out, cancelled, or performing a
+ * potentially O(n) operation (e.g. remove(x)), none of which are
+ * time-critical enough to warrant the overhead that alternatives
+ * would impose on other threads.
+ *
+ * Because the sweepVotes estimate is conservative, and because
+ * nodes become unlinked "naturally" as they fall off the head of
+ * the queue, and because we allow votes to accumulate even while
+ * sweeps are in progress, there are typically significantly fewer
+ * such nodes than estimated. Choice of a threshold value
+ * balances the likelihood of wasted effort and contention, versus
+ * providing a worst-case bound on retention of interior nodes in
+ * quiescent queues. The value defined below was chosen
+ * empirically to balance these under various timeout scenarios.
+ *
+ * Note that we cannot self-link unlinked interior nodes during
+ * sweeps. However, the associated garbage chains terminate when
+ * some successor ultimately falls off the head of the list and is
+ * self-linked.
*/
/** True if on multiprocessor */
@@ -382,11 +424,19 @@
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
+ * The maximum number of estimated removal failures (sweepVotes)
+ * to tolerate before sweeping through the queue unlinking
+ * cancelled nodes that were not unlinked upon initial
+ * removal. See above for explanation. The value must be at least
+ * two to avoid useless sweeps when removing trailing nodes.
+ */
+ static final int SWEEP_THRESHOLD = 32;
+
+ /**
* Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Relies heavily on Unsafe mechanics to minimize
- * unnecessary ordering constraints: Writes that intrinsically
- * precede or follow CASes use simple relaxed forms. Other
- * cleanups use releasing/lazy writes.
+ * unnecessary ordering constraints: Writes that are intrinsically
+ * ordered wrt other accesses or CASes use simple relaxed forms.
*/
static final class Node {
final boolean isData; // false if this is a request node
@@ -400,13 +450,13 @@
}
final boolean casItem(Object cmp, Object val) {
- // assert cmp == null || cmp.getClass() != Node.class;
+ // assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
- * Creates a new node. Uses relaxed write because item can only
- * be seen if followed by CAS.
+ * Constructs a new node. Uses relaxed write because item can
+ * only be seen after publication via casNext.
*/
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
@@ -422,13 +472,17 @@
}
/**
- * Sets item to self (using a releasing/lazy write) and waiter
- * to null, to avoid garbage retention after extracting or
- * cancelling.
+ * Sets item to self and waiter to null, to avoid garbage
+ * retention after matching or cancelling. Uses relaxed writes
+ * because order is already constrained in the only calling
+ * contexts: item is forgotten only after volatile/atomic
+ * mechanics that extract items. Similarly, clearing waiter
+ * follows either CAS or return from park (if ever parked;
+ * else we don't care).
*/
final void forgetContents() {
- UNSAFE.putOrderedObject(this, itemOffset, this);
- UNSAFE.putOrderedObject(this, waiterOffset, null);
+ UNSAFE.putObject(this, itemOffset, this);
+ UNSAFE.putObject(this, waiterOffset, null);
}
/**
@@ -462,7 +516,7 @@
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
- // assert isData;
+ // assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
@@ -486,12 +540,12 @@
/** head of the queue; null until first enqueue */
transient volatile Node head;
- /** predecessor of dangling unspliceable node */
- private transient volatile Node cleanMe; // decl here reduces contention
-
/** tail of the queue; null until first append */
private transient volatile Node tail;
+ /** The number of apparent failures to unsplice removed nodes */
+ private transient volatile int sweepVotes;
+
// CAS methods for fields
private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
@@ -501,8 +555,8 @@
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
- private boolean casCleanMe(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
+ private boolean casSweepVotes(int cmp, int val) {
+ return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
/*
@@ -515,7 +569,7 @@
@SuppressWarnings("unchecked")
static <E> E cast(Object item) {
- // assert item == null || item.getClass() != Node.class;
+ // assert item == null || item.getClass() != Node.class;
return (E) item;
}
@@ -544,10 +598,8 @@
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
- Node n = q.next; // update head by 2
- if (n != null) // unless singleton
- q = n;
- if (head == h && casHead(h, q)) {
+ Node n = q.next; // update by 2 unless singleton
+ if (head == h && casHead(h, n == null? q : n)) {
h.forgetNext();
break;
} // advance and retry
@@ -632,12 +684,12 @@
for (;;) {
Object item = s.item;
if (item != e) { // matched
- // assert item != s;
+ // assert item != s;
s.forgetContents(); // avoid garbage
return this.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
- s.casItem(e, s)) { // cancel
+ s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
@@ -647,9 +699,8 @@
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
- if (--spins == 0)
- shortenHeadPath(); // reduce slack before blocking
- else if (randomYields.nextInt(CHAINED_SPINS) == 0)
+ --spins;
+ if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
@@ -663,8 +714,6 @@
}
else {
LockSupport.park(this);
- s.waiter = null;
- spins = -1; // spin if front upon wakeup
}
}
}
@@ -685,27 +734,6 @@
return 0;
}
- /**
- * Tries (once) to unsplice nodes between head and first unmatched
- * or trailing node; failing on contention.
- */
- private void shortenHeadPath() {
- Node h, hn, p, q;
- if ((p = h = head) != null && h.isMatched() &&
- (q = hn = h.next) != null) {
- Node n;
- while ((n = q.next) != q) {
- if (n == null || !q.isMatched()) {
- if (hn != q && h.next == hn)
- h.casNext(hn, q);
- break;
- }
- p = q;
- q = n;
- }
- }
- }
-
/* -------------- Traversal methods -------------- */
/**
@@ -818,7 +846,8 @@
public final void remove() {
Node p = lastRet;
if (p == null) throw new IllegalStateException();
- findAndRemoveDataNode(lastPred, p);
+ if (p.tryMatchData())
+ unsplice(lastPred, p);
}
}
@@ -828,99 +857,68 @@
* Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor.
*
- * @param pred predecessor of node to be unspliced
+ * @param pred a node that was at one time known to be the
+ * predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced
*/
- private void unsplice(Node pred, Node s) {
- s.forgetContents(); // clear unneeded fields
+ final void unsplice(Node pred, Node s) {
+ s.forgetContents(); // forget unneeded fields
/*
- * At any given time, exactly one node on list cannot be
- * unlinked -- the last inserted node. To accommodate this, if
- * we cannot unlink s, we save its predecessor as "cleanMe",
- * processing the previously saved version first. Because only
- * one node in the list can have a null next, at least one of
- * node s or the node previously saved can always be
- * processed, so this always terminates.
+ * See above for rationale. Briefly: if pred still points to
+ * s, try to unlink s. If s cannot be unlinked, because it is
+ * trailing node or pred might be unlinked, and neither pred
+ * nor s are head or offlist, add to sweepVotes, and if enough
+ * votes have accumulated, sweep.
*/
- if (pred != null && pred != s) {
- while (pred.next == s) {
- Node oldpred = (cleanMe == null) ? null : reclean();
- Node n = s.next;
- if (n != null) {
- if (n != s)
- pred.casNext(s, n);
- break;
+ if (pred != null && pred != s && pred.next == s) {
+ Node n = s.next;
+ if (n == null ||
+ (n != s && pred.casNext(s, n) && pred.isMatched())) {
+ for (;;) { // check if at, or could be, head
+ Node h = head;
+ if (h == pred || h == s || h == null)
+ return; // at head or list empty
+ if (!h.isMatched())
+ break;
+ Node hn = h.next;
+ if (hn == null)
+ return; // now empty
+ if (hn != h && casHead(h, hn))
+ h.forgetNext(); // advance head
}
- if (oldpred == pred || // Already saved
- ((oldpred == null || oldpred.next == s) &&
- casCleanMe(oldpred, pred))) {
- break;
+ if (pred.next != pred && s.next != s) { // recheck if offlist
+ for (;;) { // sweep now if enough votes
+ int v = sweepVotes;
+ if (v < SWEEP_THRESHOLD) {
+ if (casSweepVotes(v, v + 1))
+ break;
+ }
+ else if (casSweepVotes(v, 0)) {
+ sweep();
+ break;
+ }
+ }
}
}
}
}
/**
- * Tries to unsplice the deleted/cancelled node held in cleanMe
- * that was previously uncleanable because it was at tail.
- *
- * @return current cleanMe node (or null)
+ * Unlinks matched (typically cancelled) nodes encountered in a
+ * traversal from head.
*/
- private Node reclean() {
- /*
- * cleanMe is, or at one time was, predecessor of a cancelled
- * node s that was the tail so could not be unspliced. If it
- * is no longer the tail, try to unsplice if necessary and
- * make cleanMe slot available. This differs from similar
- * code in unsplice() because we must check that pred still
- * points to a matched node that can be unspliced -- if not,
- * we can (must) clear cleanMe without unsplicing. This can
- * loop only due to contention.
- */
- Node pred;
- while ((pred = cleanMe) != null) {
- Node s = pred.next;
- Node n;
- if (s == null || s == pred || !s.isMatched())
- casCleanMe(pred, null); // already gone
- else if ((n = s.next) != null) {
- if (n != s)
- pred.casNext(s, n);
- casCleanMe(pred, null);
- }
- else
+ private void sweep() {
+ for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
+ if (!s.isMatched())
+ // Unmatched nodes are never self-linked
+ p = s;
+ else if ((n = s.next) == null) // trailing node is pinned
break;
- }
- return pred;
- }
-
- /**
- * Main implementation of Iterator.remove(). Finds
- * and unsplices the given data node.
- *
- * @param possiblePred possible predecessor of s
- * @param s the node to remove
- */
- final void findAndRemoveDataNode(Node possiblePred, Node s) {
- // assert s.isData;
- if (s.tryMatchData()) {
- if (possiblePred != null && possiblePred.next == s)
- unsplice(possiblePred, s); // was actual predecessor
- else {
- for (Node pred = null, p = head; p != null; ) {
- if (p == s) {
- unsplice(pred, p);
- break;
- }
- if (p.isUnmatchedRequest())
- break;
- pred = p;
- if ((p = p.next) == pred) { // stale
- pred = null;
- p = head;
- }
- }
- }
+ else if (s == n) // stale
+ // No need to also check for p == s, since that implies s == n
+ p = head;
+ else
+ p.casNext(s, n);
}
}
@@ -1158,7 +1156,11 @@
* @return {@code true} if this queue contains no elements
*/
public boolean isEmpty() {
- return firstOfMode(true) == null;
+ for (Node p = head; p != null; p = succ(p)) {
+ if (!p.isMatched())
+ return !p.isData;
+ }
+ return true;
}
public boolean hasWaitingConsumer() {
@@ -1252,8 +1254,8 @@
objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
private static final long tailOffset =
objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
- private static final long cleanMeOffset =
- objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
+ private static final long sweepVotesOffset =
+ objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
@@ -1266,5 +1268,4 @@
throw error;
}
}
-
}
--- a/jdk/src/share/classes/java/util/concurrent/Phaser.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/src/share/classes/java/util/concurrent/Phaser.java Mon Sep 13 09:55:03 2010 +0100
@@ -898,7 +898,7 @@
boolean doWait() {
if (thread != null) {
try {
- ForkJoinPool.managedBlock(this, false);
+ ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
}
}
--- a/jdk/test/java/util/concurrent/forkjoin/Integrate.java Mon Sep 13 09:32:36 2010 +0800
+++ b/jdk/test/java/util/concurrent/forkjoin/Integrate.java Mon Sep 13 09:55:03 2010 +0100
@@ -206,7 +206,7 @@
q.fork();
ar = recEval(c, r, fc, fr, ar);
if (!q.tryUnfork()) {
- q.quietlyHelpJoin();
+ q.quietlyJoin();
return ar + q.area;
}
return ar + recEval(l, c, fl, fc, al);
@@ -254,7 +254,7 @@
(q = new DQuad(l, c, al)).fork();
ar = recEval(c, r, fc, fr, ar);
if (q != null && !q.tryUnfork()) {
- q.quietlyHelpJoin();
+ q.quietlyJoin();
return ar + q.area;
}
return ar + recEval(l, c, fl, fc, al);