6986050: Small clarifications and fixes for ForkJoin
authordl
Tue, 21 Sep 2010 16:06:59 +0100
changeset 6674 2b22e69fdb75
parent 6673 3674dbc66612
child 6675 c86763d8f1c7
child 6676 c8629a8bbd7d
6986050: Small clarifications and fixes for ForkJoin Summary: Clarify FJ.get on throw InterruptedException, propagate ThreadFactory, shutdown transition Reviewed-by: chegar
jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java
jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
jdk/src/share/classes/java/util/concurrent/RecursiveAction.java
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java	Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinPool.java	Tue Sep 21 16:06:59 2010 +0100
@@ -42,7 +42,6 @@
 import java.util.List;
 import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -823,15 +822,13 @@
                                    (workerCounts & RUNNING_COUNT_MASK) <= 1);
                 long startTime = untimed? 0 : System.nanoTime();
                 Thread.interrupted();         // clear/ignore interrupt
-                if (eventCount != ec || w.runState != 0 ||
-                    runState >= TERMINATING)  // recheck after clear
-                    break;
+                if (eventCount != ec || w.isTerminating())
+                    break;                    // recheck after clear
                 if (untimed)
                     LockSupport.park(w);
                 else {
                     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
-                    if (eventCount != ec || w.runState != 0 ||
-                        runState >= TERMINATING)
+                    if (eventCount != ec || w.isTerminating())
                         break;
                     if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
                         tryShutdownUnusedWorker(ec);
@@ -899,16 +896,23 @@
                      UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
                                               wc + (ONE_RUNNING|ONE_TOTAL))) {
                 ForkJoinWorkerThread w = null;
+                Throwable fail = null;
                 try {
                     w = factory.newThread(this);
-                } finally { // adjust on null or exceptional factory return
-                    if (w == null) {
-                        decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
-                        tryTerminate(false); // handle failure during shutdown
-                    }
+                } catch (Throwable ex) {
+                    fail = ex;
                 }
-                if (w == null)
+                if (w == null) { // null or exceptional factory return
+                    decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+                    tryTerminate(false); // handle failure during shutdown
+                    // If originating from an external caller,
+                    // propagate exception, else ignore
+                    if (fail != null && runState < TERMINATING &&
+                        !(Thread.currentThread() instanceof
+                          ForkJoinWorkerThread))
+                        UNSAFE.throwException(fail);
                     break;
+                }
                 w.start(recordWorker(w), ueh);
                 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
                     int c; // advance event count
@@ -997,8 +1001,12 @@
         boolean active = w.active;
         boolean inactivate = false;
         int pc = parallelism;
-        int rs;
-        while (w.runState == 0 && (rs = runState) < TERMINATING) {
+        while (w.runState == 0) {
+            int rs = runState;
+            if (rs >= TERMINATING) { // propagate shutdown
+                w.shutdown();
+                break;
+            }
             if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
                 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
                 inactivate = active = w.active = false;
@@ -1126,6 +1134,7 @@
         return true;
     }
 
+
     /**
      * Actions on transition to TERMINATING
      *
@@ -1149,7 +1158,7 @@
                     if (passes > 0 && !w.isTerminated()) {
                         w.cancelTasks();
                         LockSupport.unpark(w);
-                        if (passes > 1) {
+                        if (passes > 1 && !w.isInterrupted()) {
                             try {
                                 w.interrupt();
                             } catch (SecurityException ignore) {
@@ -1726,6 +1735,13 @@
     }
 
     /**
+     * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
+     */
+    final boolean isAtLeastTerminating() {
+        return runState >= TERMINATING;
+    }
+
+    /**
      * Returns {@code true} if this pool has been shut down.
      *
      * @return {@code true} if this pool has been shut down
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java	Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java	Tue Sep 21 16:06:59 2010 +0100
@@ -55,10 +55,10 @@
  * start other subtasks.  As indicated by the name of this class,
  * many programs using {@code ForkJoinTask} employ only methods
  * {@link #fork} and {@link #join}, or derivatives such as {@link
- * #invokeAll}.  However, this class also provides a number of other
- * methods that can come into play in advanced usages, as well as
- * extension mechanics that allow support of new forms of fork/join
- * processing.
+ * #invokeAll(ForkJoinTask...) invokeAll}.  However, this class also
+ * provides a number of other methods that can come into play in
+ * advanced usages, as well as extension mechanics that allow
+ * support of new forms of fork/join processing.
  *
  * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
  * The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -250,7 +250,7 @@
         int s;         // the odd construction reduces lock bias effects
         while ((s = status) >= 0) {
             try {
-                synchronized(this) {
+                synchronized (this) {
                     if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
                         wait();
                 }
@@ -270,7 +270,7 @@
         int s;
         if ((s = status) >= 0) {
             try {
-                synchronized(this) {
+                synchronized (this) {
                     if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
                         wait(millis, 0);
                 }
@@ -288,7 +288,7 @@
     private void externalAwaitDone() {
         int s;
         while ((s = status) >= 0) {
-            synchronized(this) {
+            synchronized (this) {
                 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
                     boolean interrupted = false;
                     while (status >= 0) {
@@ -669,11 +669,34 @@
         setCompletion(NORMAL);
     }
 
+    /**
+     * Waits if necessary for the computation to complete, and then
+     * retrieves its result.
+     *
+     * @return the computed result
+     * @throws CancellationException if the computation was cancelled
+     * @throws ExecutionException if the computation threw an
+     * exception
+     * @throws InterruptedException if the current thread is not a
+     * member of a ForkJoinPool and was interrupted while waiting
+     */
     public final V get() throws InterruptedException, ExecutionException {
-        quietlyJoin();
-        if (Thread.interrupted())
-            throw new InterruptedException();
-        int s = status;
+        int s;
+        if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+            quietlyJoin();
+            s = status;
+        }
+        else {
+            while ((s = status) >= 0) {
+                synchronized (this) { // interruptible form of awaitDone
+                    if (UNSAFE.compareAndSwapInt(this, statusOffset,
+                                                 s, SIGNAL)) {
+                        while (status >= 0)
+                            wait();
+                    }
+                }
+            }
+        }
         if (s < NORMAL) {
             Throwable ex;
             if (s == CANCELLED)
@@ -684,6 +707,20 @@
         return getRawResult();
     }
 
+    /**
+     * Waits if necessary for at most the given time for the computation
+     * to complete, and then retrieves its result, if available.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return the computed result
+     * @throws CancellationException if the computation was cancelled
+     * @throws ExecutionException if the computation threw an
+     * exception
+     * @throws InterruptedException if the current thread is not a
+     * member of a ForkJoinPool and was interrupted while waiting
+     * @throws TimeoutException if the wait timed out
+     */
     public final V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
         Thread t = Thread.currentThread();
@@ -725,7 +762,7 @@
                         long ms = nt / 1000000;
                         int ns = (int) (nt % 1000000);
                         try {
-                            synchronized(this) {
+                            synchronized (this) {
                                 if (status >= 0)
                                     wait(ms, ns);
                             }
--- a/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java	Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java	Tue Sep 21 16:06:59 2010 +0100
@@ -778,11 +778,20 @@
 
     // status check methods used mainly by ForkJoinPool
     final boolean isRunning()     { return runState == 0; }
-    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
     final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
     final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
     final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
 
+    final boolean isTerminating() {
+        if ((runState & TERMINATING) != 0)
+            return true;
+        if (pool.isAtLeastTerminating()) { // propagate pool state
+            shutdown();
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Sets state to TERMINATING. Does NOT unpark or interrupt
      * to wake up if currently blocked. Callers must do so if desired.
--- a/jdk/src/share/classes/java/util/concurrent/RecursiveAction.java	Tue Sep 21 15:58:06 2010 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/RecursiveAction.java	Tue Sep 21 16:06:59 2010 +0100
@@ -138,7 +138,7 @@
  *        if (right.tryUnfork()) // directly calculate if not stolen
  *          sum += right.atLeaf(right.lo, right.hi);
  *       else {
- *          right.helpJoin();
+ *          right.join();
  *          sum += right.result;
  *        }
  *        right = right.next;