--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Tue Jul 09 10:44:49 2013 +0200
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Tue Jul 09 16:04:25 2013 +0200
@@ -47,6 +47,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
@@ -79,9 +80,9 @@
* level; by default, equal to the number of available processors. The
* pool attempts to maintain enough active (or available) threads by
* dynamically adding, suspending, or resuming internal worker
- * threads, even if some tasks are stalled waiting to join
- * others. However, no such adjustments are guaranteed in the face of
- * blocked I/O or other unmanaged synchronization. The nested {@link
+ * threads, even if some tasks are stalled waiting to join others.
+ * However, no such adjustments are guaranteed in the face of blocked
+ * I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
@@ -157,6 +158,7 @@
* @since 1.7
* @author Doug Lea
*/
+@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
/*
@@ -189,32 +191,35 @@
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
- * The main differences ultimately stem from GC requirements that
- * we null out taken slots as soon as we can, to maintain as small
- * a footprint as possible even in programs generating huge
- * numbers of tasks. To accomplish this, we shift the CAS
- * arbitrating pop vs poll (steal) from being on the indices
- * ("base" and "top") to the slots themselves. So, both a
- * successful pop and poll mainly entail a CAS of a slot from
- * non-null to null. Because we rely on CASes of references, we
- * do not need tag bits on base or top. They are simple ints as
- * used in any circular array-based queue (see for example
- * ArrayDeque). Updates to the indices must still be ordered in a
- * way that guarantees that top == base means the queue is empty,
- * but otherwise may err on the side of possibly making the queue
- * appear nonempty when a push, pop, or poll have not fully
- * committed. Note that this means that the poll operation,
- * considered individually, is not wait-free. One thief cannot
- * successfully continue until another in-progress one (or, if
- * previously empty, a push) completes. However, in the
- * aggregate, we ensure at least probabilistic non-blockingness.
- * If an attempted steal fails, a thief always chooses a different
- * random victim target to try next. So, in order for one thief to
- * progress, it suffices for any in-progress poll or new push on
- * any empty queue to complete. (This is why we normally use
- * method pollAt and its variants that try once at the apparent
- * base index, else consider alternative actions, rather than
- * method poll.)
+ * See also "Correct and Efficient Work-Stealing for Weak Memory
+ * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
+ * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
+ * analysis of memory ordering (atomic, volatile etc) issues. The
+ * main differences ultimately stem from GC requirements that we
+ * null out taken slots as soon as we can, to maintain as small a
+ * footprint as possible even in programs generating huge numbers
+ * of tasks. To accomplish this, we shift the CAS arbitrating pop
+ * vs poll (steal) from being on the indices ("base" and "top") to
+ * the slots themselves. So, both a successful pop and poll
+ * mainly entail a CAS of a slot from non-null to null. Because
+ * we rely on CASes of references, we do not need tag bits on base
+ * or top. They are simple ints as used in any circular
+ * array-based queue (see for example ArrayDeque). Updates to the
+ * indices must still be ordered in a way that guarantees that top
+ * == base means the queue is empty, but otherwise may err on the
+ * side of possibly making the queue appear nonempty when a push,
+ * pop, or poll have not fully committed. Note that this means
+ * that the poll operation, considered individually, is not
+ * wait-free. One thief cannot successfully continue until another
+ * in-progress one (or, if previously empty, a push) completes.
+ * However, in the aggregate, we ensure at least probabilistic
+ * non-blockingness. If an attempted steal fails, a thief always
+ * chooses a different random victim target to try next. So, in
+ * order for one thief to progress, it suffices for any
+ * in-progress poll or new push on any empty queue to
+ * complete. (This is why we normally use method pollAt and its
+ * variants that try once at the apparent base index, else
+ * consider alternative actions, rather than method poll.)
*
* This approach also enables support of a user mode in which local
* task processing is in FIFO, not LIFO order, simply by using
@@ -334,37 +339,35 @@
* has not yet entered the wait queue. We solve this by requiring
* a full sweep of all workers (via repeated calls to method
* scan()) both before and after a newly waiting worker is added
- * to the wait queue. During a rescan, the worker might release
- * some other queued worker rather than itself, which has the same
- * net effect. Because enqueued workers may actually be rescanning
- * rather than waiting, we set and clear the "parker" field of
- * WorkQueues to reduce unnecessary calls to unpark. (This
- * requires a secondary recheck to avoid missed signals.) Note
- * the unusual conventions about Thread.interrupts surrounding
- * parking and other blocking: Because interrupts are used solely
- * to alert threads to check termination, which is checked anyway
- * upon blocking, we clear status (using Thread.interrupted)
- * before any call to park, so that park does not immediately
- * return due to status being set via some other unrelated call to
- * interrupt in user code.
+ * to the wait queue. Because enqueued workers may actually be
+ * rescanning rather than waiting, we set and clear the "parker"
+ * field of WorkQueues to reduce unnecessary calls to unpark.
+ * (This requires a secondary recheck to avoid missed signals.)
+ * Note the unusual conventions about Thread.interrupts
+ * surrounding parking and other blocking: Because interrupts are
+ * used solely to alert threads to check termination, which is
+ * checked anyway upon blocking, we clear status (using
+ * Thread.interrupted) before any call to park, so that park does
+ * not immediately return due to status being set via some other
+ * unrelated call to interrupt in user code.
*
* Signalling. We create or wake up workers only when there
* appears to be at least one task they might be able to find and
- * execute. However, many other threads may notice the same task
- * and each signal to wake up a thread that might take it. So in
- * general, pools will be over-signalled. When a submission is
- * added or another worker adds a task to a queue that has fewer
- * than two tasks, they signal waiting workers (or trigger
- * creation of new ones if fewer than the given parallelism level
- * -- signalWork), and may leave a hint to the unparked worker to
- * help signal others upon wakeup). These primary signals are
- * buttressed by others (see method helpSignal) whenever other
- * threads scan for work or do not have a task to process. On
- * most platforms, signalling (unpark) overhead time is noticeably
+ * execute. When a submission is added or another worker adds a
+ * task to a queue that has fewer than two tasks, they signal
+ * waiting workers (or trigger creation of new ones if fewer than
+ * the given parallelism level -- signalWork). These primary
+ * signals are buttressed by others whenever other threads remove
+ * a task from a queue and notice that there are other tasks there
+ * as well. So in general, pools will be over-signalled. On most
+ * platforms, signalling (unpark) overhead time is noticeably
* long, and the time between signalling a thread and it actually
* making progress can be very noticeably long, so it is worth
* offloading these delays from critical paths as much as
- * possible.
+ * possible. Additionally, workers spin-down gradually, by staying
+ * alive so long as they see the ctl state changing. Similar
+ * stability-sensing techniques are also used before blocking in
+ * awaitJoin and helpComplete.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -477,7 +480,7 @@
* Common Pool
* ===========
*
- * The static common Pool always exists after static
+ * The static common pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
@@ -485,8 +488,11 @@
* fullExternalPush during the first submission to the pool.
*
* When external threads submit to the common pool, they can
- * perform some subtask processing (see externalHelpJoin and
- * related methods). We do not need to record whether these
+ * perform subtask processing (see externalHelpJoin and related
+ * methods). This caller-helps policy makes it sensible to set
+ * common pool parallelism level to one (or more) less than the
+ * total number of available cores, or even zero for pure
+ * caller-runs. We do not need to record whether external
* submissions are to the common pool -- if not, externalHelpJoin
* returns quickly (at the most helping to signal some common pool
* workers). These submitters would otherwise be blocked waiting
@@ -631,18 +637,10 @@
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. (It would be best for queue objects
* and their arrays to share, but there is nothing available to
- * help arrange that). Unfortunately, because they are recorded
- * in a common array, WorkQueue instances are often moved to be
- * adjacent by garbage collectors. To reduce impact, we use field
- * padding that works OK on common platforms; this effectively
- * trades off slightly slower average field access for the sake of
- * avoiding really bad worst-case access. (Until better JVM
- * support is in place, this padding is dependent on transient
- * properties of JVM field layout rules.) We also take care in
- * allocating, sizing and resizing the array. Non-shared queue
- * arrays are initialized by workers before use. Others are
- * allocated on first use.
+ * help arrange that). The @Contended annotation alerts JVMs to
+ * try to keep instances apart.
*/
+ @sun.misc.Contended
static final class WorkQueue {
/**
* Capacity of work-stealing queue array upon initialization.
@@ -664,16 +662,12 @@
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
- // Heuristic padding to ameliorate unfortunate memory placements
- volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
-
- int seed; // for random scanning; initialize nonzero
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
- int hint; // steal or signal hint (index)
- int poolIndex; // index of this queue in pool (or 0)
- final int mode; // 0: lifo, > 0: fifo, < 0: shared
int nsteals; // number of steals
+ int hint; // steal index hint
+ short poolIndex; // index of this queue in pool
+ final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
@@ -684,15 +678,12 @@
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
- volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
- volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d;
-
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
int seed) {
this.pool = pool;
this.owner = owner;
- this.mode = mode;
- this.seed = seed;
+ this.mode = (short)mode;
+ this.hint = seed; // store initial seed for runWorker
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
@@ -705,7 +696,7 @@
return (n >= 0) ? 0 : -n; // ignore transient negative
}
- /**
+ /**
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
@@ -730,20 +721,18 @@
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
- int s = top, m, n;
+ int s = top, n;
if ((a = array) != null) { // ignore if queue removed
- int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
- U.putOrderedObject(a, j, task);
- if ((n = (top = s + 1) - base) <= 2) {
- if ((p = pool) != null)
- p.signalWork(this);
- }
+ int m = a.length - 1;
+ U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
+ if ((n = (top = s + 1) - base) <= 2)
+ (p = pool).signalWork(p.workQueues, this);
else if (n >= m)
growArray();
}
}
- /**
+ /**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
@@ -801,9 +790,8 @@
if ((a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
- base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
+ base == b && U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
@@ -819,9 +807,8 @@
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (t != null) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
@@ -878,49 +865,43 @@
ForkJoinTask.cancelIgnoringExceptions(t);
}
- /**
- * Computes next value for random probes. Scans don't require
- * a very high quality generator, but also not a crummy one.
- * Marsaglia xor-shift is cheap and works well enough. Note:
- * This is manually inlined in its usages in ForkJoinPool to
- * avoid writes inside busy scan loops.
- */
- final int nextSeed() {
- int r = seed;
- r ^= r << 13;
- r ^= r >>> 17;
- return seed = r ^= r << 5;
- }
-
// Specialized execution methods
/**
- * Pops and runs tasks until empty.
- */
- private void popAndExecAll() {
- // A bit faster than repeated pop calls
- ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
- while ((a = array) != null && (m = a.length - 1) >= 0 &&
- (s = top - 1) - base >= 0 &&
- (t = ((ForkJoinTask<?>)
- U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
- != null) {
- if (U.compareAndSwapObject(a, j, t, null)) {
- top = s;
- t.doExec();
- }
- }
- }
-
- /**
* Polls and runs tasks until empty.
*/
- private void pollAndExecAll() {
+ final void pollAndExecAll() {
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec();
}
/**
+ * Executes a top-level task and any local tasks remaining
+ * after execution.
+ */
+ final void runTask(ForkJoinTask<?> task) {
+ if ((currentSteal = task) != null) {
+ task.doExec();
+ ForkJoinTask<?>[] a = array;
+ int md = mode;
+ ++nsteals;
+ currentSteal = null;
+ if (md != 0)
+ pollAndExecAll();
+ else if (a != null) {
+ int s, m = a.length - 1;
+ ForkJoinTask<?> t;
+ while ((s = top - 1) - base >= 0 &&
+ (t = (ForkJoinTask<?>)U.getAndSetObject
+ (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
+ top = s;
+ t.doExec();
+ }
+ }
+ }
+ }
+
+ /**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Returns (true) on any CAS
* or consistency check failure so caller can retry.
@@ -928,13 +909,15 @@
* @return false if no progress can be made, else true
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
- boolean stat = true, removed = false, empty = true;
+ boolean stat;
ForkJoinTask<?>[] a; int m, s, b, n;
- if ((a = array) != null && (m = a.length - 1) >= 0 &&
+ if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
(n = (s = top) - (b = base)) > 0) {
+ boolean removed = false, empty = true;
+ stat = true;
for (ForkJoinTask<?> t;;) { // traverse from s to b
- int j = ((--s & m) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
+ long j = ((--s & m) << ASHIFT) + ABASE;
+ t = (ForkJoinTask<?>)U.getObject(a, j);
if (t == null) // inconsistent length
break;
else if (t == task) {
@@ -962,68 +945,95 @@
break;
}
}
+ if (removed)
+ task.doExec();
}
- if (removed)
- task.doExec();
+ else
+ stat = false;
return stat;
}
/**
- * Polls for and executes the given task or any other task in
- * its CountedCompleter computation.
+ * Tries to poll for and execute the given task or any other
+ * task in its CountedCompleter computation.
*/
- final boolean pollAndExecCC(ForkJoinTask<?> root) {
- ForkJoinTask<?>[] a; int b; Object o;
- outer: while ((b = base) - top < 0 && (a = array) != null) {
+ final boolean pollAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
+ if ((b = base) - top < 0 && (a = array) != null) {
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) == null ||
- !(o instanceof CountedCompleter))
- break;
- for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
- if (r == root) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
- t.doExec();
+ if ((o = U.getObjectVolatile(a, j)) == null)
+ return true; // retry
+ if (o instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (base == b &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
+ t.doExec();
+ }
return true;
}
- else
- break; // restart
+ else if ((r = r.completer) == null)
+ break; // not part of root computation
}
- if ((r = r.completer) == null)
- break outer; // not part of root computation
}
}
return false;
}
/**
- * Executes a top-level task and any local tasks remaining
- * after execution.
+ * Tries to pop and execute the given task or any other task
+ * in its CountedCompleter computation.
*/
- final void runTask(ForkJoinTask<?> t) {
- if (t != null) {
- (currentSteal = t).doExec();
- currentSteal = null;
- ++nsteals;
- if (base - top < 0) { // process remaining local tasks
- if (mode == 0)
- popAndExecAll();
- else
- pollAndExecAll();
+ final boolean externalPopAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
+ if (base - (s = top) < 0 && (a = array) != null) {
+ long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
+ if (top == s && array == a &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ top = s - 1;
+ qlock = 0;
+ t.doExec();
+ }
+ else
+ qlock = 0;
+ }
+ return true;
+ }
+ else if ((r = r.completer) == null)
+ break;
+ }
}
}
+ return false;
}
/**
- * Executes a non-top-level (stolen) task.
+ * Internal version
*/
- final void runSubtask(ForkJoinTask<?> t) {
- if (t != null) {
- ForkJoinTask<?> ps = currentSteal;
- (currentSteal = t).doExec();
- currentSteal = ps;
+ final boolean internalPopAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
+ if (base - (s = top) < 0 && (a = array) != null) {
+ long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ top = s - 1;
+ t.doExec();
+ }
+ return true;
+ }
+ else if ((r = r.completer) == null)
+ break;
+ }
+ }
}
+ return false;
}
/**
@@ -1040,6 +1050,7 @@
// Unsafe mechanics
private static final sun.misc.Unsafe U;
+ private static final long QBASE;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
@@ -1048,6 +1059,8 @@
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
+ QBASE = U.objectFieldOffset
+ (k.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
@@ -1087,7 +1100,7 @@
/**
* Common pool parallelism. To allow simpler use and management
* when common pool threads are disabled, we allow the underlying
- * common.config field to be zero, but in that case still report
+ * common.parallelism field to be zero, but in that case still report
* parallelism as 1 to reflect resulting caller-runs mechanics.
*/
static final int commonParallelism;
@@ -1227,35 +1240,18 @@
static final int FIFO_QUEUE = 1;
static final int SHARED_QUEUE = -1;
- // bounds for #steps in scan loop -- must be power 2 minus 1
- private static final int MIN_SCAN = 0x1ff; // cover estimation slop
- private static final int MAX_SCAN = 0x1ffff; // 4 * max workers
-
// Instance fields
-
- /*
- * Field layout of this class tends to matter more than one would
- * like. Runtime layout order is only loosely related to
- * declaration order and may differ across JVMs, but the following
- * empirically works OK on current JVMs.
- */
-
- // Heuristic padding to ameliorate unfortunate memory placements
- volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
-
volatile long stealCount; // collects worker counts
volatile long ctl; // main pool control
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
- final int config; // mode and parallelism level
+ final short parallelism; // parallelism level
+ final short mode; // LIFO/FIFO
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
- volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
- volatile Object pad18, pad19, pad1a, pad1b;
-
/**
* Acquires the plock lock to protect worker array and related
* updates. This method is called only if an initial CAS on plock
@@ -1307,11 +1303,11 @@
* parallelism level exist. Adjusts counts etc on failure.
*/
private void tryAddWorker() {
- long c; int u;
+ long c; int u, e;
while ((u = (int)((c = ctl) >>> 32)) < 0 &&
- (u & SHORT_SIGN) != 0 && (int)c == 0) {
- long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
- ((u + UAC_UNIT) & UAC_MASK)) << 32;
+ (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
+ long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
+ ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
@@ -1322,8 +1318,8 @@
wt.start();
break;
}
- } catch (Throwable e) {
- ex = e;
+ } catch (Throwable rex) {
+ ex = rex;
}
deregisterWorker(wt, ex);
break;
@@ -1351,7 +1347,7 @@
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
s == 0); // skip 0
- WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
+ WorkQueue w = new WorkQueue(this, wt, mode, s);
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1371,14 +1367,15 @@
}
}
}
- w.eventCount = w.poolIndex = r; // volatile write orders
+ w.poolIndex = (short)r;
+ w.eventCount = r; // volatile write orders
ws[r] = w;
}
} finally {
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
- wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
+ wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
return w;
}
@@ -1396,9 +1393,7 @@
if (wt != null && (w = wt.workQueue) != null) {
int ps;
w.qlock = -1; // ensure set
- long ns = w.nsteals, sc; // collect steal count
- do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
- sc = stealCount, sc + ns));
+ U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1464,19 +1459,21 @@
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
- WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
- if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
- (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
- (q = ws[m & z & SQMASK]) != null &&
+ WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
+ int r = ThreadLocalRandom.getProbe();
+ int ps = plock;
+ WorkQueue[] ws = workQueues;
+ if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
+ (q = ws[m & r & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
- int b = q.base, s = q.top, n, an;
- if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
- int j = (((an - 1) & s) << ASHIFT) + ABASE;
+ if ((a = q.array) != null &&
+ (am = a.length - 1) > (n = (s = q.top) - q.base)) {
+ int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
q.top = s + 1; // push on to deque
q.qlock = 0;
- if (n <= 2)
- signalWork(q);
+ if (n <= 1)
+ signalWork(ws, q);
return;
}
q.qlock = 0;
@@ -1514,7 +1511,7 @@
throw new RejectedExecutionException();
else if (ps == 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) { // initialize workQueues
- int p = config & SMASK; // find power of two table size
+ int p = parallelism; // find power of two table size
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
@@ -1546,7 +1543,7 @@
q.qlock = 0; // unlock
}
if (submitted) {
- signalWork(q);
+ signalWork(ws, q);
return;
}
}
@@ -1554,6 +1551,7 @@
}
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
q = new WorkQueue(this, null, SHARED_QUEUE, r);
+ q.poolIndex = (short)k;
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1577,41 +1575,42 @@
*/
final void incrementActiveCount() {
long c;
- do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
+ do {} while (!U.compareAndSwapLong
+ (this, CTL, c = ctl, ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
}
/**
* Tries to create or activate a worker if too few are active.
*
- * @param q the (non-null) queue holding tasks to be signalled
+ * @param ws the worker array to use to find signallees
+ * @param q if non-null, the queue holding tasks to be processed
*/
- final void signalWork(WorkQueue q) {
- int hint = q.poolIndex;
- long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
- while ((u = (int)((c = ctl) >>> 32)) < 0) {
- if ((e = (int)c) > 0) {
- if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
- (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
- long nc = (((long)(w.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.hint = hint;
- w.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = w.parker) != null)
- U.unpark(p);
- break;
- }
- if (q.top - q.base <= 0)
- break;
- }
- else
- break;
- }
- else {
+ final void signalWork(WorkQueue[] ws, WorkQueue q) {
+ for (;;) {
+ long c; int e, u, i; WorkQueue w; Thread p;
+ if ((u = (int)((c = ctl) >>> 32)) >= 0)
+ break;
+ if ((e = (int)c) <= 0) {
if ((short)u < 0)
tryAddWorker();
break;
}
+ if (ws == null || ws.length <= (i = e & SMASK) ||
+ (w = ws[i]) == null)
+ break;
+ long nc = (((long)(w.nextWait & E_MASK)) |
+ ((long)(u + UAC_UNIT)) << 32);
+ int ne = (e + E_SEQ) & E_MASK;
+ if (w.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = ne;
+ if ((p = w.parker) != null)
+ U.unpark(p);
+ break;
+ }
+ if (q != null && q.base >= q.top)
+ break;
}
}
@@ -1622,215 +1621,152 @@
*/
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
- do { w.runTask(scan(w)); } while (w.qlock >= 0);
+ for (int r = w.hint; scan(w, r) == 0; ) {
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ }
}
/**
- * Scans for and, if found, returns one task, else possibly
+ * Scans for and, if found, runs one task, else possibly
* inactivates the worker. This method operates on single reads of
* volatile state and is designed to be re-invoked continuously,
* in part because it returns upon detecting inconsistencies,
* contention, or state changes that indicate possible success on
* re-invocation.
*
- * The scan searches for tasks across queues (starting at a random
- * index, and relying on registerWorker to irregularly scatter
- * them within array to avoid bias), checking each at least twice.
- * The scan terminates upon either finding a non-empty queue, or
- * completing the sweep. If the worker is not inactivated, it
- * takes and returns a task from this queue. Otherwise, if not
- * activated, it signals workers (that may include itself) and
- * returns so caller can retry. Also returns for true if the
- * worker array may have changed during an empty scan. On failure
- * to find a task, we take one of the following actions, after
- * which the caller will retry calling this method unless
- * terminated.
- *
- * * If pool is terminating, terminate the worker.
- *
- * * If not already enqueued, try to inactivate and enqueue the
- * worker on wait queue. Or, if inactivating has caused the pool
- * to be quiescent, relay to idleAwaitWork to possibly shrink
- * pool.
- *
- * * If already enqueued and none of the above apply, possibly
- * park awaiting signal, else lingering to help scan and signal.
- *
- * * If a non-empty queue discovered or left as a hint,
- * help wake up other workers before return.
+ * The scan searches for tasks across queues starting at a random
+ * index, checking each at least twice. The scan terminates upon
+ * either finding a non-empty queue, or completing the sweep. If
+ * the worker is not inactivated, it takes and runs a task from
+ * this queue. Otherwise, if not activated, it tries to activate
+ * itself or some other worker by signalling. On failure to find a
+ * task, returns (for retry) if pool state may have changed during
+ * an empty scan, or tries to inactivate if active, else possibly
+ * blocks or terminates via method awaitWork.
*
* @param w the worker (via its WorkQueue)
- * @return a task or null if none found
+ * @param r a random seed
+ * @return worker qlock status if would have waited, else 0
*/
- private final ForkJoinTask<?> scan(WorkQueue w) {
+ private final int scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
- int ps = plock; // read plock before ws
- if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
- int ec = w.eventCount; // ec is negative if inactive
- int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
- w.hint = -1; // update seed and clear hint
- int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
- do {
- WorkQueue q; ForkJoinTask<?>[] a; int b;
- if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
- (a = q.array) != null) { // probably nonempty
- int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, i);
- if (q.base == b && ec >= 0 && t != null &&
- U.compareAndSwapObject(a, i, t, null)) {
- if ((q.base = b + 1) - q.top < 0)
- signalWork(q);
- return t; // taken
- }
- else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
- w.hint = (r + j) & m; // help signal below
- break; // cannot take
+ long c = ctl; // for consistency check
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
+ for (int j = m + m + 1, ec = w.eventCount;;) {
+ WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
+ if ((q = ws[(r - j) & m]) != null &&
+ (b = q.base) - q.top < 0 && (a = q.array) != null) {
+ long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ if ((t = ((ForkJoinTask<?>)
+ U.getObjectVolatile(a, i))) != null) {
+ if (ec < 0)
+ helpRelease(c, ws, w, q, b);
+ else if (q.base == b &&
+ U.compareAndSwapObject(a, i, t, null)) {
+ U.putOrderedInt(q, QBASE, b + 1);
+ if ((b + 1) - q.top < 0)
+ signalWork(ws, q);
+ w.runTask(t);
+ }
}
- }
- } while (--j >= 0);
-
- int h, e, ns; long c, sc; WorkQueue q;
- if ((ns = w.nsteals) != 0) {
- if (U.compareAndSwapLong(this, STEALCOUNT,
- sc = stealCount, sc + ns))
- w.nsteals = 0; // collect steals and rescan
- }
- else if (plock != ps) // consistency check
- ; // skip
- else if ((e = (int)(c = ctl)) < 0)
- w.qlock = -1; // pool is terminating
- else {
- if ((h = w.hint) < 0) {
- if (ec >= 0) { // try to enqueue/inactivate
- long nc = (((long)ec |
- ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
- w.nextWait = e; // link and mark inactive
- w.eventCount = ec | INT_SIGN;
- if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
- w.eventCount = ec; // unmark on CAS failure
- else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
- idleAwaitWork(w, nc, c);
- }
- else if (w.eventCount < 0 && ctl == c) {
- Thread wt = Thread.currentThread();
- Thread.interrupted(); // clear status
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt; // emulate LockSupport.park
- if (w.eventCount < 0) // recheck
- U.park(false, 0L); // block
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- }
+ break;
}
- if ((h >= 0 || (h = w.hint) >= 0) &&
- (ws = workQueues) != null && h < ws.length &&
- (q = ws[h]) != null) { // signal others before retry
- WorkQueue v; Thread p; int u, i, s;
- for (int n = (config & SMASK) - 1;;) {
- int idleCount = (w.eventCount < 0) ? 0 : -1;
- if (((s = idleCount - q.base + q.top) <= n &&
- (n = s) <= 0) ||
- (u = (int)((c = ctl) >>> 32)) >= 0 ||
- (e = (int)c) <= 0 || m < (i = e & SMASK) ||
- (v = ws[i]) == null)
- break;
- long nc = (((long)(v.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (v.eventCount != (e | INT_SIGN) ||
- !U.compareAndSwapLong(this, CTL, c, nc))
- break;
- v.hint = h;
- v.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = v.parker) != null)
- U.unpark(p);
- if (--n <= 0)
- break;
+ else if (--j < 0) {
+ if ((ec | (e = (int)c)) < 0) // inactive or terminating
+ return awaitWork(w, c, ec);
+ else if (ctl == c) { // try to inactivate and enqueue
+ long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
+ w.nextWait = e;
+ w.eventCount = ec | INT_SIGN;
+ if (!U.compareAndSwapLong(this, CTL, c, nc))
+ w.eventCount = ec; // back out
}
- }
- }
- }
- return null;
- }
-
- /**
- * If inactivating worker w has caused the pool to become
- * quiescent, checks for pool termination, and, so long as this is
- * not the only worker, waits for event for up to a given
- * duration. On timeout, if ctl has not changed, terminates the
- * worker, which will in turn wake up another worker to possibly
- * repeat this process.
- *
- * @param w the calling worker
- * @param currentCtl the ctl value triggering possible quiescence
- * @param prevCtl the ctl value to restore if thread is terminated
- */
- private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
- if (w != null && w.eventCount < 0 &&
- !tryTerminate(false, false) && (int)prevCtl != 0 &&
- ctl == currentCtl) {
- int dc = -(short)(currentCtl >>> TC_SHIFT);
- long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
- long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
- Thread wt = Thread.currentThread();
- while (ctl == currentCtl) {
- Thread.interrupted(); // timed variant of version in scan()
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt;
- if (ctl == currentCtl)
- U.park(false, parkTime);
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- if (ctl != currentCtl)
- break;
- if (deadline - System.nanoTime() <= 0L &&
- U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
- w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
- w.hint = -1;
- w.qlock = -1; // shrink
break;
}
}
}
+ return 0;
}
/**
- * Scans through queues looking for work while joining a task; if
- * any present, signals. May return early if more signalling is
- * detectably unneeded.
+ * A continuation of scan(), possibly blocking or terminating
+ * worker w. Returns without blocking if pool state has apparently
+ * changed since last invocation. Also, if inactivating w has
+ * caused the pool to become quiescent, checks for pool
+ * termination, and, so long as this is not the only worker, waits
+ * for event for up to a given duration. On timeout, if ctl has
+ * not changed, terminates the worker, which will in turn wake up
+ * another worker to possibly repeat this process.
*
- * @param task return early if done
- * @param origin an index to start scan
+ * @param w the calling worker
+ * @param c the ctl value on entry to scan
+ * @param ec the worker's eventCount on entry to scan
*/
- private void helpSignal(ForkJoinTask<?> task, int origin) {
- WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
- if (task != null && task.status >= 0 &&
- (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
- (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
- outer: for (int k = origin, j = m; j >= 0; --j) {
- WorkQueue q = ws[k++ & m];
- for (int n = m;;) { // limit to at most m signals
- if (task.status < 0)
- break outer;
- if (q == null ||
- ((s = -q.base + q.top) <= n && (n = s) <= 0))
- break;
- if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
- (e = (int)c) <= 0 || m < (i = e & SMASK) ||
- (w = ws[i]) == null)
- break outer;
- long nc = (((long)(w.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (w.eventCount != (e | INT_SIGN))
- break outer;
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = w.parker) != null)
- U.unpark(p);
- if (--n <= 0)
- break;
- }
+ private final int awaitWork(WorkQueue w, long c, int ec) {
+ int stat, ns; long parkTime, deadline;
+ if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
+ !Thread.interrupted()) {
+ int e = (int)c;
+ int u = (int)(c >>> 32);
+ int d = (u >> UAC_SHIFT) + parallelism; // active count
+
+ if (e < 0 || (d <= 0 && tryTerminate(false, false)))
+ stat = w.qlock = -1; // pool is terminating
+ else if ((ns = w.nsteals) != 0) { // collect steals and retry
+ w.nsteals = 0;
+ U.getAndAddLong(this, STEALCOUNT, (long)ns);
+ }
+ else {
+ long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
+ ((long)(w.nextWait & E_MASK)) | // ctl to restore
+ ((long)(u + UAC_UNIT)) << 32);
+ if (pc != 0L) { // timed wait if last waiter
+ int dc = -(short)(c >>> TC_SHIFT);
+ parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
+ (dc + 1) * IDLE_TIMEOUT);
+ deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
+ else
+ parkTime = deadline = 0L;
+ if (w.eventCount == ec && ctl == c) {
+ Thread wt = Thread.currentThread();
+ U.putObject(wt, PARKBLOCKER, this);
+ w.parker = wt; // emulate LockSupport.park
+ if (w.eventCount == ec && ctl == c)
+ U.park(false, parkTime); // must recheck before park
+ w.parker = null;
+ U.putObject(wt, PARKBLOCKER, null);
+ if (parkTime != 0L && ctl == c &&
+ deadline - System.nanoTime() <= 0L &&
+ U.compareAndSwapLong(this, CTL, c, pc))
+ stat = w.qlock = -1; // shrink pool
+ }
+ }
+ }
+ return stat;
+ }
+
+ /**
+ * Possibly releases (signals) a worker. Called only from scan()
+ * when a worker with apparently inactive status finds a non-empty
+ * queue. This requires revalidating all of the associated state
+ * from caller.
+ */
+ private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
+ WorkQueue q, int b) {
+ WorkQueue v; int e, i; Thread p;
+ if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
+ ws != null && ws.length > (i = e & SMASK) &&
+ (v = ws[i]) != null && ctl == c) {
+ long nc = (((long)(v.nextWait & E_MASK)) |
+ ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
+ int ne = (e + E_SEQ) & E_MASK;
+ if (q != null && q.base == b && w.eventCount < 0 &&
+ v.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ v.eventCount = ne;
+ if ((p = v.parker) != null)
+ U.unpark(p);
}
}
}
@@ -1855,7 +1791,8 @@
*/
private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
int stat = 0, steps = 0; // bound to avoid cycles
- if (joiner != null && task != null) { // hoist null checks
+ if (task != null && joiner != null &&
+ joiner.base - joiner.top >= 0) { // hoist checks
restart: for (;;) {
ForkJoinTask<?> subtask = task; // current target
for (WorkQueue j = joiner, v;;) { // v is stealer of subtask
@@ -1882,7 +1819,7 @@
}
}
for (;;) { // help stealer or descend to its stealer
- ForkJoinTask[] a; int b;
+ ForkJoinTask[] a; int b;
if (subtask.status < 0) // surround probes with
continue restart; // consistency checks
if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
@@ -1893,13 +1830,23 @@
v.currentSteal != subtask)
continue restart; // stale
stat = 1; // apparent progress
- if (t != null && v.base == b &&
- U.compareAndSwapObject(a, i, t, null)) {
- v.base = b + 1; // help stealer
- joiner.runSubtask(t);
+ if (v.base == b) {
+ if (t == null)
+ break restart;
+ if (U.compareAndSwapObject(a, i, t, null)) {
+ U.putOrderedInt(v, QBASE, b + 1);
+ ForkJoinTask<?> ps = joiner.currentSteal;
+ int jt = joiner.top;
+ do {
+ joiner.currentSteal = t;
+ t.doExec(); // clear local tasks too
+ } while (task.status >= 0 &&
+ joiner.top != jt &&
+ (t = joiner.pop()) != null);
+ joiner.currentSteal = ps;
+ break restart;
+ }
}
- else if (v.base == b && ++steps == MAX_HELP)
- break restart; // v apparently stalled
}
else { // empty -- try to descend
ForkJoinTask<?> next = v.currentJoin;
@@ -1926,27 +1873,45 @@
* and run tasks within the target's computation.
*
* @param task the task to join
- * @param mode if shared, exit upon completing any task
- * if all workers are active
+ * @param maxTasks the maximum number of other tasks to run
*/
- private int helpComplete(ForkJoinTask<?> task, int mode) {
- WorkQueue[] ws; WorkQueue q; int m, n, s, u;
- if (task != null && (ws = workQueues) != null &&
- (m = ws.length - 1) >= 0) {
- for (int j = 1, origin = j;;) {
+ final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
+ int maxTasks) {
+ WorkQueue[] ws; int m;
+ int s = 0;
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
+ joiner != null && task != null) {
+ int j = joiner.poolIndex;
+ int scans = m + m + 1;
+ long c = 0L; // for stability check
+ for (int k = scans; ; j += 2) {
+ WorkQueue q;
if ((s = task.status) < 0)
- return s;
- if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
- origin = j;
- if (mode == SHARED_QUEUE &&
- ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
+ break;
+ else if (joiner.internalPopAndExecCC(task)) {
+ if (--maxTasks <= 0) {
+ s = task.status;
break;
+ }
+ k = scans;
}
- else if ((j = (j + 2) & m) == origin)
+ else if ((s = task.status) < 0)
break;
+ else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
+ if (--maxTasks <= 0) {
+ s = task.status;
+ break;
+ }
+ k = scans;
+ }
+ else if (--k < 0) {
+ if (c == (c = ctl))
+ break;
+ k = scans;
+ }
}
}
- return 0;
+ return s;
}
/**
@@ -1955,17 +1920,22 @@
* for blocking. Fails on contention or termination. Otherwise,
* adds a new thread if no idle workers are available and pool
* may become starved.
+ *
+ * @param c the assumed ctl value
*/
- final boolean tryCompensate() {
- int pc = config & SMASK, e, i, tc; long c;
- WorkQueue[] ws; WorkQueue w; Thread p;
- if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
- if (e != 0 && (i = e & SMASK) < ws.length &&
- (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
+ final boolean tryCompensate(long c) {
+ WorkQueue[] ws = workQueues;
+ int pc = parallelism, e = (int)c, m, tc;
+ if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
+ WorkQueue w = ws[e & m];
+ if (e != 0 && w != null) {
+ Thread p;
long nc = ((long)(w.nextWait & E_MASK) |
(c & (AC_MASK|TC_MASK)));
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
+ int ne = (e + E_SEQ) & E_MASK;
+ if (w.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = ne;
if ((p = w.parker) != null)
U.unpark(p);
return true; // replace with idle worker
@@ -2008,23 +1978,20 @@
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
int s = 0;
- if (joiner != null && task != null && (s = task.status) >= 0) {
+ if (task != null && (s = task.status) >= 0 && joiner != null) {
ForkJoinTask<?> prevJoin = joiner.currentJoin;
joiner.currentJoin = task;
- do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
- joiner.tryRemoveAndExec(task)); // process local tasks
- if (s >= 0 && (s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 &&
- (task instanceof CountedCompleter))
- s = helpComplete(task, LIFO_QUEUE);
- }
+ do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
+ (s = task.status) >= 0);
+ if (s >= 0 && (task instanceof CountedCompleter))
+ s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
+ long cc = 0; // for stability checks
while (s >= 0 && (s = task.status) >= 0) {
- if ((!joiner.isEmpty() || // try helping
- (s = tryHelpStealer(joiner, task)) == 0) &&
+ if ((s = tryHelpStealer(joiner, task)) == 0 &&
(s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 && tryCompensate()) {
+ if (!tryCompensate(cc))
+ cc = ctl;
+ else {
if (task.trySetSignal() && (s = task.status) >= 0) {
synchronized (task) {
if (task.status >= 0) {
@@ -2037,9 +2004,11 @@
task.notifyAll();
}
}
- long c; // re-activate
+ long c; // reactivate
do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
+ (this, CTL, c = ctl,
+ ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
}
}
}
@@ -2061,15 +2030,11 @@
if (joiner != null && task != null && (s = task.status) >= 0) {
ForkJoinTask<?> prevJoin = joiner.currentJoin;
joiner.currentJoin = task;
- do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
- joiner.tryRemoveAndExec(task));
- if (s >= 0 && (s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 &&
- (task instanceof CountedCompleter))
- s = helpComplete(task, LIFO_QUEUE);
- }
- if (s >= 0 && joiner.isEmpty()) {
+ do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
+ (s = task.status) >= 0);
+ if (s >= 0) {
+ if (task instanceof CountedCompleter)
+ helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
do {} while (task.status >= 0 &&
tryHelpStealer(joiner, task) > 0);
}
@@ -2081,14 +2046,14 @@
* Returns a (probably) non-empty steal queue, if one is found
* during a scan, else null. This method must be retried by
* caller if, by the time it tries to use the queue, it is empty.
- * @param r a (random) seed for scanning
*/
- private WorkQueue findNonEmptyStealQueue(int r) {
+ private WorkQueue findNonEmptyStealQueue() {
+ int r = ThreadLocalRandom.nextSecondarySeed();
for (;;) {
int ps = plock, m; WorkQueue[] ws; WorkQueue q;
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
for (int j = (m + 1) << 2; j >= 0; --j) {
- if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
+ if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
q.base - q.top < 0)
return q;
}
@@ -2105,35 +2070,36 @@
* find tasks either.
*/
final void helpQuiescePool(WorkQueue w) {
+ ForkJoinTask<?> ps = w.currentSteal;
for (boolean active = true;;) {
long c; WorkQueue q; ForkJoinTask<?> t; int b;
- while ((t = w.nextLocalTask()) != null) {
- if (w.base - w.top < 0)
- signalWork(w);
+ while ((t = w.nextLocalTask()) != null)
t.doExec();
- }
- if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
+ if ((q = findNonEmptyStealQueue()) != null) {
if (!active) { // re-establish active count
active = true;
do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
+ (this, CTL, c = ctl,
+ ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
- if (q.base - q.top < 0)
- signalWork(q);
- w.runSubtask(t);
+ (w.currentSteal = t).doExec();
+ w.currentSteal = ps;
}
}
else if (active) { // decrement active count without queuing
- long nc = (c = ctl) - AC_UNIT;
- if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
- return; // bypass decrement-then-increment
+ long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
+ if ((int)(nc >> AC_SHIFT) + parallelism == 0)
+ break; // bypass decrement-then-increment
if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
- else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
- U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
- return;
+ else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
+ U.compareAndSwapLong
+ (this, CTL, c, ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))))
+ break;
}
}
@@ -2147,13 +2113,10 @@
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
- if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
+ if ((q = findNonEmptyStealQueue()) == null)
return null;
- if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
- if (q.base - q.top < 0)
- signalWork(q);
+ if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
return t;
- }
}
}
@@ -2206,7 +2169,7 @@
static int getSurplusQueuedTaskCount() {
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
- int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
+ int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
int n = (q = wt.workQueue).top - q.base;
int a = (int)(pool.ctl >> AC_SHIFT) + p;
return n - (a > (p >>>= 1) ? 0 :
@@ -2236,7 +2199,7 @@
*/
private boolean tryTerminate(boolean now, boolean enable) {
int ps;
- if (this == common) // cannot shut down
+ if (this == common) // cannot shut down
return false;
if ((ps = plock) >= 0) { // enable by setting plock
if (!enable)
@@ -2250,7 +2213,7 @@
}
for (long c;;) {
if (((c = ctl) & STOP_BIT) != 0) { // already terminating
- if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
+ if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
synchronized (this) {
notifyAll(); // signal when 0 workers
}
@@ -2259,17 +2222,15 @@
}
if (!now) { // check if idle & no tasks
WorkQueue[] ws; WorkQueue w;
- if ((int)(c >> AC_SHIFT) != -(config & SMASK))
+ if ((int)(c >> AC_SHIFT) + parallelism > 0)
return false;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; ++i) {
- if ((w = ws[i]) != null) {
- if (!w.isEmpty()) { // signal unprocessed tasks
- signalWork(w);
- return false;
- }
- if ((i & 1) != 0 && w.eventCount >= 0)
- return false; // unqueued inactive worker
+ if ((w = ws[i]) != null &&
+ (!w.isEmpty() ||
+ ((i & 1) != 0 && w.eventCount >= 0))) {
+ signalWork(ws, w);
+ return false;
}
}
}
@@ -2336,116 +2297,67 @@
/**
* Tries to pop the given task from submitter's queue in common pool.
*/
- static boolean tryExternalUnpush(ForkJoinTask<?> t) {
- ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
- ForkJoinTask<?>[] a; int m, s, z;
- if (t != null &&
- (z = ThreadLocalRandom.getProbe()) != 0 &&
- (p = common) != null &&
- (ws = p.workQueues) != null &&
- (m = ws.length - 1) >= 0 &&
- (q = ws[m & z & SQMASK]) != null &&
- (s = q.top) != q.base &&
- (a = q.array) != null) {
+ final boolean tryExternalUnpush(ForkJoinTask<?> task) {
+ WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
+ WorkQueue[] ws = workQueues;
+ int z = ThreadLocalRandom.getProbe();
+ boolean popped = false;
+ if (ws != null && (m = ws.length - 1) >= 0 &&
+ (joiner = ws[z & m & SQMASK]) != null &&
+ joiner.base != (s = joiner.top) &&
+ (a = joiner.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
- if (U.getObject(a, j) == t &&
- U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s && // recheck
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- q.qlock = 0;
- return true;
+ if (U.getObject(a, j) == task &&
+ U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
+ if (joiner.top == s && joiner.array == a &&
+ U.compareAndSwapObject(a, j, task, null)) {
+ joiner.top = s - 1;
+ popped = true;
}
- q.qlock = 0;
+ joiner.qlock = 0;
}
}
- return false;
+ return popped;
}
- /**
- * Tries to pop and run local tasks within the same computation
- * as the given root. On failure, tries to help complete from
- * other queues via helpComplete.
- */
- private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
- ForkJoinTask<?>[] a; int m;
- if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
- root != null && root.status >= 0) {
- for (;;) {
- int s, u; Object o; CountedCompleter<?> task = null;
- if ((s = q.top) - q.base > 0) {
- long j = ((m & (s - 1)) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) != null &&
- (o instanceof CountedCompleter)) {
- CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
- do {
- if (r == root) {
- if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s &&
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- task = t;
- }
- q.qlock = 0;
- }
- break;
- }
- } while ((r = r.completer) != null);
+ final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
+ WorkQueue joiner; int m;
+ WorkQueue[] ws = workQueues;
+ int j = ThreadLocalRandom.getProbe();
+ int s = 0;
+ if (ws != null && (m = ws.length - 1) >= 0 &&
+ (joiner = ws[j & m & SQMASK]) != null && task != null) {
+ int scans = m + m + 1;
+ long c = 0L; // for stability check
+ j |= 1; // poll odd queues
+ for (int k = scans; ; j += 2) {
+ WorkQueue q;
+ if ((s = task.status) < 0)
+ break;
+ else if (joiner.externalPopAndExecCC(task)) {
+ if (--maxTasks <= 0) {
+ s = task.status;
+ break;
}
+ k = scans;
}
- if (task != null)
- task.doExec();
- if (root.status < 0 ||
- (config != 0 &&
- ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
+ else if ((s = task.status) < 0)
break;
- if (task == null) {
- helpSignal(root, q.poolIndex);
- if (root.status >= 0)
- helpComplete(root, SHARED_QUEUE);
- break;
+ else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
+ if (--maxTasks <= 0) {
+ s = task.status;
+ break;
+ }
+ k = scans;
+ }
+ else if (--k < 0) {
+ if (c == (c = ctl))
+ break;
+ k = scans;
}
}
}
- }
-
- /**
- * Tries to help execute or signal availability of the given task
- * from submitter's queue in common pool.
- */
- static void externalHelpJoin(ForkJoinTask<?> t) {
- // Some hard-to-avoid overlap with tryExternalUnpush
- ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
- ForkJoinTask<?>[] a; int m, s, n, z;
- if (t != null &&
- (z = ThreadLocalRandom.getProbe()) != 0 &&
- (p = common) != null &&
- (ws = p.workQueues) != null &&
- (m = ws.length - 1) >= 0 &&
- (q = ws[m & z & SQMASK]) != null &&
- (a = q.array) != null) {
- int am = a.length - 1;
- if ((s = q.top) != q.base) {
- long j = ((am & (s - 1)) << ASHIFT) + ABASE;
- if (U.getObject(a, j) == t &&
- U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s &&
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- q.qlock = 0;
- t.doExec();
- }
- else
- q.qlock = 0;
- }
- }
- if (t.status >= 0) {
- if (t instanceof CountedCompleter)
- p.externalHelpComplete(q, t);
- else
- p.helpSignal(t, q.poolIndex);
- }
- }
+ return s;
}
// Exported methods
@@ -2517,7 +2429,7 @@
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
- asyncMode,
+ (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
@@ -2543,12 +2455,13 @@
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
- boolean asyncMode,
+ int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
- this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
+ this.mode = (short)mode;
+ this.parallelism = (short)parallelism;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
@@ -2736,8 +2649,8 @@
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
- int par = (config & SMASK);
- return (par > 0) ? par : 1;
+ int par;
+ return ((par = parallelism) > 0) ? par : 1;
}
/**
@@ -2759,7 +2672,7 @@
* @return the number of worker threads
*/
public int getPoolSize() {
- return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
+ return parallelism + (short)(ctl >>> TC_SHIFT);
}
/**
@@ -2769,7 +2682,7 @@
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return (config >>> 16) == FIFO_QUEUE;
+ return mode == FIFO_QUEUE;
}
/**
@@ -2800,7 +2713,7 @@
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
+ int r = parallelism + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2816,7 +2729,7 @@
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
+ return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
}
/**
@@ -2979,7 +2892,7 @@
}
}
}
- int pc = (config & SMASK);
+ int pc = parallelism;
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
@@ -3052,7 +2965,7 @@
public boolean isTerminated() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) == -(config & SMASK));
+ (short)(c >>> TC_SHIFT) + parallelism <= 0);
}
/**
@@ -3071,7 +2984,7 @@
public boolean isTerminating() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) != -(config & SMASK));
+ (short)(c >>> TC_SHIFT) + parallelism > 0);
}
/**
@@ -3108,19 +3021,20 @@
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
- long startTime = System.nanoTime();
- boolean terminated = false;
+ if (nanos <= 0L)
+ return false;
+ long deadline = System.nanoTime() + nanos;
synchronized (this) {
- for (long waitTime = nanos, millis = 0L;;) {
- if (terminated = isTerminated() ||
- waitTime <= 0L ||
- (millis = unit.toMillis(waitTime)) <= 0L)
- break;
- wait(millis);
- waitTime = nanos - (System.nanoTime() - startTime);
+ for (;;) {
+ if (isTerminated())
+ return true;
+ if (nanos <= 0L)
+ return false;
+ long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+ wait(millis > 0L ? millis : 1L);
+ nanos = deadline - System.nanoTime();
}
}
- return terminated;
}
/**
@@ -3159,11 +3073,8 @@
ForkJoinTask<?> t; WorkQueue q; int b;
if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
found = true;
- if ((t = q.pollAt(b)) != null) {
- if (q.base - q.top < 0)
- signalWork(q);
+ if ((t = q.pollAt(b)) != null)
t.doExec();
- }
break;
}
}
@@ -3278,21 +3189,8 @@
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
- while (!blocker.isReleasable()) { // variant of helpSignal
- WorkQueue[] ws; WorkQueue q; int m, u;
- if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
- for (int i = 0; i <= m; ++i) {
- if (blocker.isReleasable())
- return;
- if ((q = ws[i]) != null && q.base - q.top < 0) {
- p.signalWork(q);
- if ((u = (int)(p.ctl >>> 32)) >= 0 ||
- (u >> UAC_SHIFT) >= 0)
- break;
- }
- }
- }
- if (p.tryCompensate()) {
+ while (!blocker.isReleasable()) {
+ if (p.tryCompensate(p.ctl)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
@@ -3330,6 +3228,7 @@
private static final long STEALCOUNT;
private static final long PLOCK;
private static final long INDEXSEED;
+ private static final long QBASE;
private static final long QLOCK;
static {
@@ -3349,6 +3248,8 @@
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
Class<?> wk = WorkQueue.class;
+ QBASE = U.objectFieldOffset
+ (wk.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
Class<?> ak = ForkJoinTask[].class;
@@ -3368,7 +3269,7 @@
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
- int par = common.config; // report 1 even if threads disabled
+ int par = common.parallelism; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
@@ -3381,7 +3282,7 @@
ForkJoinWorkerThreadFactory factory
= defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null;
- try { // ignore exceptions in accesing/parsing properties
+ try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
@@ -3399,11 +3300,12 @@
} catch (Exception ignore) {
}
- if (parallelism < 0)
- parallelism = Runtime.getRuntime().availableProcessors();
+ if (parallelism < 0 && // default 1 less than #cores
+ (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0)
+ parallelism = 0;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
- return new ForkJoinPool(parallelism, factory, handler, false,
+ return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}