--- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java Thu Sep 04 12:23:01 2014 -0400
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java Fri Sep 05 10:54:28 2014 +0200
@@ -297,15 +297,22 @@
}
/**
- * Tries to set SIGNAL status unless already completed. Used by
- * ForkJoinPool. Other variants are directly incorporated into
- * externalAwaitDone etc.
+ * If not done, sets SIGNAL status and performs Object.wait(timeout).
+ * This task may or may not be done on exit. Ignores interrupts.
*
- * @return true if successful
+ * @param timeout using Object.wait conventions.
*/
- final boolean trySetSignal() {
- int s = status;
- return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
+ final void internalWait(long timeout) {
+ int s;
+ if ((s = status) >= 0 && // force completer to issue notify
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ try { wait(timeout); } catch (InterruptedException ie) { }
+ else
+ notifyAll();
+ }
+ }
}
/**
@@ -313,35 +320,29 @@
* @return status upon completion
*/
private int externalAwaitDone() {
- int s;
- ForkJoinPool cp = ForkJoinPool.common;
- if ((s = status) >= 0) {
- if (cp != null) {
- if (this instanceof CountedCompleter)
- s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- s = doExec();
- }
- if (s >= 0 && (s = status) >= 0) {
- boolean interrupted = false;
- do {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait();
- } catch (InterruptedException ie) {
- interrupted = true;
- }
+ int s = ((this instanceof CountedCompleter) ? // try helping
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
+ if (s >= 0 && (s = status) >= 0) {
+ boolean interrupted = false;
+ do {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait(0L);
+ } catch (InterruptedException ie) {
+ interrupted = true;
}
- else
- notifyAll();
}
+ else
+ notifyAll();
}
- } while ((s = status) >= 0);
- if (interrupted)
- Thread.currentThread().interrupt();
- }
+ }
+ } while ((s = status) >= 0);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
return s;
}
@@ -351,22 +352,22 @@
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
- ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted())
throw new InterruptedException();
- if ((s = status) >= 0 && cp != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
- }
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0)
- wait();
- else
- notifyAll();
+ if ((s = status) >= 0 &&
+ (s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
+ 0)) >= 0) {
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(0L);
+ else
+ notifyAll();
+ }
}
}
}
@@ -386,7 +387,7 @@
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
- wt.pool.awaitJoin(w, this) :
+ wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
@@ -399,7 +400,8 @@
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ (wt = (ForkJoinWorkerThread)t).pool.
+ awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
@@ -577,7 +579,7 @@
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (false && e.thrower != Thread.currentThread().getId()) {
+ if (e.thrower != Thread.currentThread().getId()) {
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
@@ -587,13 +589,17 @@
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
- else if (ps.length == 1 && ps[0] == Throwable.class)
- return (Throwable)(c.newInstance(ex));
+ else if (ps.length == 1 && ps[0] == Throwable.class) {
+ Throwable wx = (Throwable)c.newInstance(ex);
+ return (wx == null) ? ex : wx;
+ }
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
- wx.initCause(ex);
- return wx;
+ if (wx != null) {
+ wx.initCause(ex);
+ return wx;
+ }
}
} catch (Exception ignore) {
}
@@ -1017,67 +1023,40 @@
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ int s;
+ long nanos = unit.toNanos(timeout);
if (Thread.interrupted())
throw new InterruptedException();
- // Messy in part because we measure in nanosecs, but wait in millisecs
- int s; long ms;
- long ns = unit.toNanos(timeout);
- ForkJoinPool cp;
- if ((s = status) >= 0 && ns > 0L) {
- long deadline = System.nanoTime() + ns;
- ForkJoinPool p = null;
- ForkJoinPool.WorkQueue w = null;
+ if ((s = status) >= 0 && nanos > 0L) {
+ long d = System.nanoTime() + nanos;
+ long deadline = (d == 0L) ? 1L : d; // avoid 0
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- p = wt.pool;
- w = wt.workQueue;
- p.helpJoinOnce(w, this); // no retries on failure
- }
- else if ((cp = ForkJoinPool.common) != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
+ s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
}
- boolean canBlock = false;
- boolean interrupted = false;
- try {
- while ((s = status) >= 0) {
- if (w != null && w.qlock < 0)
- cancelIgnoringExceptions(this);
- else if (!canBlock) {
- if (p == null || p.tryCompensate(p.ctl))
- canBlock = true;
- }
- else {
- if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
- U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait(ms);
- } catch (InterruptedException ie) {
- if (p == null)
- interrupted = true;
- }
- }
- else
- notifyAll();
- }
+ else if ((s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ?
+ doExec() : 0)) >= 0) {
+ long ns, ms; // measure in nanosecs, but wait in millisecs
+ while ((s = status) >= 0 &&
+ (ns = deadline - System.nanoTime()) > 0L) {
+ if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(ms); // OK to throw InterruptedException
+ else
+ notifyAll();
}
- if ((s = status) < 0 || interrupted ||
- (ns = deadline - System.nanoTime()) <= 0L)
- break;
}
}
- } finally {
- if (p != null && canBlock)
- p.incrementActiveCount();
}
- if (interrupted)
- throw new InterruptedException();
}
+ if (s >= 0)
+ s = status;
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)