--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Fri Feb 15 11:18:01 2019 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java Fri Feb 15 11:18:01 2019 -0800
@@ -445,8 +445,7 @@
* if to its current value). This would be extremely costly. So
* we relax it in several ways: (1) Producers only signal when
* their queue is possibly empty at some point during a push
- * operation (which requires conservatively checking size zero or
- * one to cover races). (2) Other workers propagate this signal
+ * operation. (2) Other workers propagate this signal
* when they find tasks in a queue with size greater than one. (3)
* Workers only enqueue after scanning (see below) and not finding
* any tasks. (4) Rather than CASing ctl to its current value in
@@ -762,10 +761,8 @@
/**
* The maximum number of top-level polls per worker before
- * checking other queues, expressed as a bit shift to, in effect,
- * multiply by pool size, and then use as random value mask, so
- * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See
- * above for rationale.
+ * checking other queues, expressed as a bit shift. See above for
+ * rationale.
*/
static final int TOP_BOUND_SHIFT = 10;
@@ -841,18 +838,17 @@
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
- int s = top, d, cap, m;
+ int s = top, d = s - base, cap, m;
ForkJoinPool p = pool;
if ((a = array) != null && (cap = a.length) > 0) {
QA.setRelease(a, (m = cap - 1) & s, task);
top = s + 1;
- if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
- p != null) { // size 0 or 1
- VarHandle.fullFence();
- p.signalWork();
+ if (d == m)
+ growArray(false);
+ else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) {
+ VarHandle.fullFence(); // was empty
+ p.signalWork(null);
}
- else if (d == m)
- growArray(false);
}
}
@@ -863,16 +859,16 @@
final boolean lockedPush(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
boolean signal = false;
- int s = top, b = base, cap, d;
+ int s = top, d = s - base, cap, m;
if ((a = array) != null && (cap = a.length) > 0) {
- a[(cap - 1) & s] = task;
+ a[(m = (cap - 1)) & s] = task;
top = s + 1;
- if (b - s + cap - 1 == 0)
+ if (d == m)
growArray(true);
else {
phase = 0; // full volatile unlock
- if (((s - base) & ~1) == 0) // size 0 or 1
- signal = true;
+ if (a[m & (s - 1)] == null)
+ signal = true; // was empty
}
}
return signal;
@@ -1014,25 +1010,30 @@
* queue, up to bound n (to avoid infinite unfairness).
*/
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
- if (t != null && q != null) { // hoist checks
- int nstolen = 1;
- for (;;) {
+ int nstolen = 1;
+ for (int j = 0;;) {
+ if (t != null)
t.doExec();
- if (n-- < 0)
+ if (j++ <= n)
+ t = nextLocalTask();
+ else {
+ j = 0;
+ t = null;
+ }
+ if (t == null) {
+ if (q != null && (t = q.poll()) != null) {
+ ++nstolen;
+ j = 0;
+ }
+ else if (j != 0)
break;
- else if ((t = nextLocalTask()) == null) {
- if ((t = q.poll()) == null)
- break;
- else
- ++nstolen;
- }
}
- ForkJoinWorkerThread thread = owner;
- nsteals += nstolen;
- source = 0;
- if (thread != null)
- thread.afterTopLevelExec();
}
+ ForkJoinWorkerThread thread = owner;
+ nsteals += nstolen;
+ source = 0;
+ if (thread != null)
+ thread.afterTopLevelExec();
}
/**
@@ -1455,7 +1456,7 @@
if (!tryTerminate(false, false) && // possibly replace worker
w != null && w.array != null) // avoid repeated failures
- signalWork();
+ signalWork(null);
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
@@ -1465,8 +1466,9 @@
/**
* Tries to create or release a worker if too few are running.
+ * @param q if non-null recheck if empty on CAS failure
*/
- final void signalWork() {
+ final void signalWork(WorkQueue q) {
for (;;) {
long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
if ((c = ctl) >= 0L) // enough workers
@@ -1493,6 +1495,8 @@
LockSupport.unpark(vt);
break;
}
+ else if (q != null && q.isEmpty()) // no need to retry
+ break;
}
}
}
@@ -1613,19 +1617,24 @@
else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
tryTerminate(false, false))
break; // quiescent shutdown
- else if (rc <= 0 && pred != 0 && phase == (int)c) {
- long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
- long d = keepAlive + System.currentTimeMillis();
- LockSupport.parkUntil(this, d);
- if (ctl == c && // drop on timeout if all idle
- d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
- CTL.compareAndSet(this, c, nc)) {
- w.phase = QUIET;
- break;
+ else if (w.phase < 0) {
+ if (rc <= 0 && pred != 0 && phase == (int)c) {
+ long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
+ long d = keepAlive + System.currentTimeMillis();
+ LockSupport.parkUntil(this, d);
+ if (ctl == c && // drop on timeout if all idle
+ d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
+ CTL.compareAndSet(this, c, nc)) {
+ w.phase = QUIET;
+ break;
+ }
+ }
+ else {
+ LockSupport.park(this);
+ if (w.phase < 0) // one spurious wakeup check
+ LockSupport.park(this);
}
}
- else if (w.phase < 0)
- LockSupport.park(this); // OK if spuriously woken
w.source = 0; // disable signal
}
}
@@ -1651,10 +1660,10 @@
QA.compareAndSet(a, k, t, null)) {
q.base = b;
w.source = qid;
- if (q.top - b > 0)
- signalWork();
+ if (a[(cap - 1) & b] != null)
+ signalWork(q); // help signal if more tasks
w.topLevelExec(t, q, // random fairness bound
- r & ((n << TOP_BOUND_SHIFT) - 1));
+ (r | (1 << TOP_BOUND_SHIFT)) & SMASK);
}
}
return true;
@@ -1900,7 +1909,7 @@
r = ThreadLocalRandom.advanceProbe(r);
else {
if (q.lockedPush(task))
- signalWork();
+ signalWork(null);
return;
}
}