6986050: Small clarifications and fixes for ForkJoin
Summary: Clarify FJ.get on throw InterruptedException, propagate ThreadFactory, shutdown transition
Reviewed-by: chegar
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java Tue Sep 21 16:06:59 2010 +0100
@@ -42,7 +42,6 @@
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -823,15 +822,13 @@
(workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING) // recheck after clear
- break;
+ if (eventCount != ec || w.isTerminating())
+ break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING)
+ if (eventCount != ec || w.isTerminating())
break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
tryShutdownUnusedWorker(ec);
@@ -899,16 +896,23 @@
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc + (ONE_RUNNING|ONE_TOTAL))) {
ForkJoinWorkerThread w = null;
+ Throwable fail = null;
try {
w = factory.newThread(this);
- } finally { // adjust on null or exceptional factory return
- if (w == null) {
- decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
- tryTerminate(false); // handle failure during shutdown
- }
+ } catch (Throwable ex) {
+ fail = ex;
}
- if (w == null)
+ if (w == null) { // null or exceptional factory return
+ decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+ tryTerminate(false); // handle failure during shutdown
+ // If originating from an external caller,
+ // propagate exception, else ignore
+ if (fail != null && runState < TERMINATING &&
+ !(Thread.currentThread() instanceof
+ ForkJoinWorkerThread))
+ UNSAFE.throwException(fail);
break;
+ }
w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count
@@ -997,8 +1001,12 @@
boolean active = w.active;
boolean inactivate = false;
int pc = parallelism;
- int rs;
- while (w.runState == 0 && (rs = runState) < TERMINATING) {
+ while (w.runState == 0) {
+ int rs = runState;
+ if (rs >= TERMINATING) { // propagate shutdown
+ w.shutdown();
+ break;
+ }
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false;
@@ -1126,6 +1134,7 @@
return true;
}
+
/**
* Actions on transition to TERMINATING
*
@@ -1149,7 +1158,7 @@
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
- if (passes > 1) {
+ if (passes > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
@@ -1726,6 +1735,13 @@
}
/**
+ * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
+ */
+ final boolean isAtLeastTerminating() {
+ return runState >= TERMINATING;
+ }
+
+ /**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java Tue Sep 21 16:06:59 2010 +0100
@@ -55,10 +55,10 @@
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
- * #invokeAll}. However, this class also provides a number of other
- * methods that can come into play in advanced usages, as well as
- * extension mechanics that allow support of new forms of fork/join
- * processing.
+ * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
+ * provides a number of other methods that can come into play in
+ * advanced usages, as well as extension mechanics that allow
+ * support of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -250,7 +250,7 @@
int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) {
try {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait();
}
@@ -270,7 +270,7 @@
int s;
if ((s = status) >= 0) {
try {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait(millis, 0);
}
@@ -288,7 +288,7 @@
private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
boolean interrupted = false;
while (status >= 0) {
@@ -669,11 +669,34 @@
setCompletion(NORMAL);
}
+ /**
+ * Waits if necessary for the computation to complete, and then
+ * retrieves its result.
+ *
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ */
public final V get() throws InterruptedException, ExecutionException {
- quietlyJoin();
- if (Thread.interrupted())
- throw new InterruptedException();
- int s = status;
+ int s;
+ if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ quietlyJoin();
+ s = status;
+ }
+ else {
+ while ((s = status) >= 0) {
+ synchronized (this) { // interruptible form of awaitDone
+ if (UNSAFE.compareAndSwapInt(this, statusOffset,
+ s, SIGNAL)) {
+ while (status >= 0)
+ wait();
+ }
+ }
+ }
+ }
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
@@ -684,6 +707,20 @@
return getRawResult();
}
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ * @throws TimeoutException if the wait timed out
+ */
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread();
@@ -725,7 +762,7 @@
long ms = nt / 1000000;
int ns = (int) (nt % 1000000);
try {
- synchronized(this) {
+ synchronized (this) {
if (status >= 0)
wait(ms, ns);
}
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Tue Sep 21 16:06:59 2010 +0100
@@ -778,11 +778,20 @@
// status check methods used mainly by ForkJoinPool
final boolean isRunning() { return runState == 0; }
- final boolean isTerminating() { return (runState & TERMINATING) != 0; }
final boolean isTerminated() { return (runState & TERMINATED) != 0; }
final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
+ final boolean isTerminating() {
+ if ((runState & TERMINATING) != 0)
+ return true;
+ if (pool.isAtLeastTerminating()) { // propagate pool state
+ shutdown();
+ return true;
+ }
+ return false;
+ }
+
/**
* Sets state to TERMINATING. Does NOT unpark or interrupt
* to wake up if currently blocked. Callers must do so if desired.
--- a/jdk/src/share/classes/java/util/concurrent/RecursiveAction.java Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/RecursiveAction.java Tue Sep 21 16:06:59 2010 +0100
@@ -138,7 +138,7 @@
* if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi);
* else {
- * right.helpJoin();
+ * right.join();
* sum += right.result;
* }
* right = right.next;